Skip to content

Commit

Permalink
Merge remote-tracking branch 'couchbase/unstable' into HEAD
Browse files Browse the repository at this point in the history
reverting fix for MB-50871

Change-Id: Ic56a6eef0a7fce27222564486b557c1acfa1f340
  • Loading branch information
jeelanp2003 committed Feb 20, 2022
2 parents aaa033e + 05770cc commit 1632117
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 170 deletions.
7 changes: 0 additions & 7 deletions consumer/bucket_ops.go
Expand Up @@ -512,13 +512,6 @@ var addOwnershipHistorySRRCallback = func(args ...interface{}) error {
c := args[0].(*Consumer)
vbKey := args[1].(common.Key)
operr := args[2].(*error)

if atomic.LoadUint32(&c.isTerminateRunning) == 1 {
logging.Tracef("%s [%s:%s:%d] Exiting as worker is terminating",
logPrefix, c.workerName, c.tcpPort, c.Pid())
return nil
}

upsertOptions := &gocb.UpsertSpecOptions{CreatePath: true}

retrySRRUpdate:
Expand Down
309 changes: 153 additions & 156 deletions consumer/process_events.go
Expand Up @@ -125,6 +125,159 @@ func (c *Consumer) processDCPEvents() {
c.processAndSendDcpDelOrExpMessage(e, functionInstanceID, false)
c.dcpExpiryCounter++

case mcd.DCP_STREAMREQ:

logging.Infof("%s [%s:%s:%d] vb: %d got STREAMREQ status: %v",
logPrefix, c.workerName, c.tcpPort, c.Pid(), e.VBucket, e.Status)

retryCheckMetadataUpdated:
if metadataUpdated, ok := c.vbProcessingStats.getVbStat(e.VBucket, "vb_stream_request_metadata_updated").(bool); ok {
logging.Infof("%s [%s:%s:%d] vb: %d STREAMREQ metadataUpdated: %t",
logPrefix, c.workerName, c.tcpPort, c.Pid(), e.VBucket, metadataUpdated)
if metadataUpdated {
c.vbProcessingStats.updateVbStat(e.VBucket, "vb_stream_request_metadata_updated", false)
} else {
time.Sleep(time.Second)
goto retryCheckMetadataUpdated
}
} else {
logging.Infof("%s [%s:%s:%d] vb: %d STREAMREQ metadataUpdated not found",
logPrefix, c.workerName, c.tcpPort, c.Pid(), e.VBucket)
time.Sleep(time.Second)
goto retryCheckMetadataUpdated
}

if e.Status == mcd.SUCCESS {

vbFlog := &vbFlogEntry{statusCode: e.Status, streamReqRetry: false, vb: e.VBucket}

var vbBlob vbucketKVBlob
var cas gocb.Cas

vbKey := fmt.Sprintf("%s::vb::%d", c.app.AppName, e.VBucket)

var operr error
err := util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, getOpCallback,
c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, &operr, false)
if err == common.ErrRetryTimeout {
logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid())
return
} else if operr == common.ErrEncryptionLevelChanged {
logging.Errorf("%s [%s:%s:%d] Skipping current STREAMREQ event as change in encryption level was detected during bootstrap", logPrefix, c.workerName, c.tcpPort, c.Pid())
continue
}

vbuuid, seqNo, err := e.FailoverLog.Latest()
if err != nil {
logging.Errorf("%s [%s:%s:%d] vb: %d STREAMREQ Inserting entry: %#v to vbFlogChan."+
" Failure to get latest failover log, err: %v",
logPrefix, c.workerName, c.tcpPort, c.Pid(), e.VBucket, vbFlog, err)
c.vbFlogChan <- vbFlog
continue
}

c.vbProcessingStats.updateVbStat(e.VBucket, "vb_uuid", vbuuid)

// Update metadata with latest vbuuid and rollback seq no
vbBlob.AssignedWorker = c.ConsumerName()
vbBlob.CurrentVBOwner = c.HostPortAddr()
vbBlob.DCPStreamStatus = dcpStreamRunning
vbBlob.LastSeqNoProcessed = seqNo
vbBlob.NodeUUID = c.uuid
vbBlob.VBuuid = vbuuid
vbBlob.FailoverLog = *e.FailoverLog

var startSeqNo uint64
if seqNo, ok := c.vbProcessingStats.getVbStat(e.VBucket, "last_processed_seq_no").(uint64); ok {
startSeqNo = seqNo
}

c.sendUpdateProcessedSeqNo(e.VBucket, startSeqNo)

if val, ok := c.vbProcessingStats.getVbStat(e.VBucket, "bootstrap_stream_req_done").(bool); ok && !val {
c.vbProcessingStats.updateVbStat(e.VBucket, "bootstrap_stream_req_done", true)
vbBlob.BootstrapStreamReqDone = true
logging.Infof("%s [%s:%s:%d] vb: %d updated bootstrap done flag to: %t",
logPrefix, c.workerName, c.tcpPort, c.Pid(), e.VBucket, vbBlob.BootstrapStreamReqDone)
}

err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, addOwnershipHistorySRSCallback,
c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &operr)
if err == common.ErrRetryTimeout {
logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid())
return
} else if operr == common.ErrEncryptionLevelChanged {
continue
}

c.vbProcessingStats.updateVbStat(e.VBucket, "assigned_worker", c.ConsumerName())
c.vbProcessingStats.updateVbStat(e.VBucket, "current_vb_owner", c.HostPortAddr())
c.vbProcessingStats.updateVbStat(e.VBucket, "dcp_stream_status", dcpStreamRunning)
c.vbProcessingStats.updateVbStat(e.VBucket, "node_uuid", c.uuid)

c.vbProcessingStats.updateVbStat(e.VBucket, "ever_owned_vb", true)
c.vbProcessingStats.updateVbStat(e.VBucket, "host_name", c.HostPortAddr())
c.vbProcessingStats.updateVbStat(e.VBucket, "last_checkpointed_seq_no", startSeqNo)
c.vbProcessingStats.updateVbStat(e.VBucket, "timestamp", time.Now().Format(time.RFC3339))
c.vbProcessingStats.updateVbStat(e.VBucket, "worker_name", c.ConsumerName())

c.vbProcessingStats.updateVbStat(e.VBucket, "dcp_stream_requested_node_uuid", c.NodeUUID())
c.vbProcessingStats.updateVbStat(e.VBucket, "dcp_stream_requested_worker", c.ConsumerName())

c.vbProcessingStats.updateVbStat(e.VBucket, "vb_filter_ack_received", false)
c.vbProcessingStats.updateVbStat(e.VBucket, "failover_log", vbBlob.FailoverLog)

if !c.checkIfCurrentConsumerShouldOwnVb(e.VBucket) {
c.Lock()
c.vbsRemainingToClose = append(c.vbsRemainingToClose, e.VBucket)
c.Unlock()

c.filterVbEventsRWMutex.Lock()
c.filterVbEvents[e.VBucket] = struct{}{}
c.filterVbEventsRWMutex.Unlock()
}

logging.Infof("%s [%s:%s:%d] vb: %d STREAMREQ Inserting entry: %#v to vbFlogChan",
logPrefix, c.workerName, c.tcpPort, c.Pid(), e.VBucket, vbFlog)
c.vbFlogChan <- vbFlog
continue
}

if e.Status == mcd.KEY_EEXISTS {
vbFlog := &vbFlogEntry{statusCode: e.Status, streamReqRetry: false, vb: e.VBucket}

logging.Infof("%s [%s:%s:%d] vb: %d STREAMREQ Inserting entry: %#v to vbFlogChan",
logPrefix, c.workerName, c.tcpPort, c.Pid(), e.VBucket, vbFlog)
c.vbFlogChan <- vbFlog
continue
}

if e.Status != mcd.SUCCESS {

vbFlog := &vbFlogEntry{
flog: e.FailoverLog,
seqNo: e.Seqno,
statusCode: e.Status,
streamReqRetry: true,
vb: e.VBucket,
}

vbKey := fmt.Sprintf("%s::vb::%d", c.app.AppName, e.VBucket)

var operr error
err := util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, addOwnershipHistorySRFCallback,
c, c.producer.AddMetadataPrefix(vbKey), &operr)
if err == common.ErrRetryTimeout {
logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid())
return
} else if operr == common.ErrEncryptionLevelChanged {
continue
}

logging.Infof("%s [%s:%s:%d] vb: %d STREAMREQ Failed. Inserting entry: %#v to vbFlogChan",
logPrefix, c.workerName, c.tcpPort, c.Pid(), e.VBucket, vbFlog)
c.vbFlogChan <- vbFlog
}
case mcd.DCP_STREAMEND:
logging.Infof("%s [%s:%s:%d] vb: %d got STREAMEND", logPrefix, c.workerName, c.tcpPort, c.Pid(), e.VBucket)
c.vbProcessingStats.updateVbStat(e.VBucket, "vb_stream_request_metadata_updated", false)
Expand Down Expand Up @@ -510,162 +663,6 @@ func (c *Consumer) addToAggChan(dcpFeed *couchbase.DcpFeed) {
return
}

switch e.Opcode {
case mcd.DCP_STREAMREQ:
logging.Infof("%s [%s:%s:%d] vb: %d got STREAMREQ status: %v",
logPrefix, c.workerName, c.tcpPort, c.Pid(), e.VBucket, e.Status)

retryCheckMetadataUpdated:
if metadataUpdated, ok := c.vbProcessingStats.getVbStat(e.VBucket, "vb_stream_request_metadata_updated").(bool); ok {
logging.Infof("%s [%s:%s:%d] vb: %d STREAMREQ metadataUpdated: %t",
logPrefix, c.workerName, c.tcpPort, c.Pid(), e.VBucket, metadataUpdated)
if metadataUpdated {
c.vbProcessingStats.updateVbStat(e.VBucket, "vb_stream_request_metadata_updated", false)
} else {
time.Sleep(time.Second)
goto retryCheckMetadataUpdated
}
} else {
logging.Infof("%s [%s:%s:%d] vb: %d STREAMREQ metadataUpdated not found",
logPrefix, c.workerName, c.tcpPort, c.Pid(), e.VBucket)
time.Sleep(time.Second)
goto retryCheckMetadataUpdated
}

if e.Status == mcd.SUCCESS {
vbFlog := &vbFlogEntry{statusCode: e.Status, streamReqRetry: false, vb: e.VBucket}

var vbBlob vbucketKVBlob
var cas gocb.Cas

vbKey := fmt.Sprintf("%s::vb::%d", c.app.AppName, e.VBucket)

var operr error
err := util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, getOpCallback,
c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, &operr, false)
if err == common.ErrRetryTimeout {
logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid())
continue
} else if operr == common.ErrEncryptionLevelChanged {
logging.Errorf("%s [%s:%s:%d] Skipping current STREAMREQ event as change in encryption level was detected during bootstrap", logPrefix, c.workerName, c.tcpPort, c.Pid())
continue
}

vbuuid, seqNo, err := e.FailoverLog.Latest()
if err != nil {
logging.Errorf("%s [%s:%s:%d] vb: %d STREAMREQ Inserting entry: %#v to vbFlogChan."+
" Failure to get latest failover log, err: %v",
logPrefix, c.workerName, c.tcpPort, c.Pid(), e.VBucket, vbFlog, err)
c.vbFlogChan <- vbFlog
continue
}

c.vbProcessingStats.updateVbStat(e.VBucket, "vb_uuid", vbuuid)

// Update metadata with latest vbuuid and rollback seq no
vbBlob.AssignedWorker = c.ConsumerName()
vbBlob.CurrentVBOwner = c.HostPortAddr()
vbBlob.DCPStreamStatus = dcpStreamRunning
vbBlob.LastSeqNoProcessed = seqNo
vbBlob.NodeUUID = c.uuid
vbBlob.VBuuid = vbuuid
vbBlob.FailoverLog = *e.FailoverLog

var startSeqNo uint64
if seqNo, ok := c.vbProcessingStats.getVbStat(e.VBucket, "last_processed_seq_no").(uint64); ok {
startSeqNo = seqNo
}

c.sendUpdateProcessedSeqNo(e.VBucket, startSeqNo)

if val, ok := c.vbProcessingStats.getVbStat(e.VBucket, "bootstrap_stream_req_done").(bool); ok && !val {
c.vbProcessingStats.updateVbStat(e.VBucket, "bootstrap_stream_req_done", true)
vbBlob.BootstrapStreamReqDone = true
logging.Infof("%s [%s:%s:%d] vb: %d updated bootstrap done flag to: %t",
logPrefix, c.workerName, c.tcpPort, c.Pid(), e.VBucket, vbBlob.BootstrapStreamReqDone)
}

err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, addOwnershipHistorySRSCallback,
c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &operr)
if err == common.ErrRetryTimeout {
logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid())
continue
} else if operr == common.ErrEncryptionLevelChanged {
continue
}

c.vbProcessingStats.updateVbStat(e.VBucket, "assigned_worker", c.ConsumerName())
c.vbProcessingStats.updateVbStat(e.VBucket, "current_vb_owner", c.HostPortAddr())
c.vbProcessingStats.updateVbStat(e.VBucket, "dcp_stream_status", dcpStreamRunning)
c.vbProcessingStats.updateVbStat(e.VBucket, "node_uuid", c.uuid)

c.vbProcessingStats.updateVbStat(e.VBucket, "ever_owned_vb", true)
c.vbProcessingStats.updateVbStat(e.VBucket, "host_name", c.HostPortAddr())
c.vbProcessingStats.updateVbStat(e.VBucket, "last_checkpointed_seq_no", startSeqNo)
c.vbProcessingStats.updateVbStat(e.VBucket, "timestamp", time.Now().Format(time.RFC3339))
c.vbProcessingStats.updateVbStat(e.VBucket, "worker_name", c.ConsumerName())

c.vbProcessingStats.updateVbStat(e.VBucket, "dcp_stream_requested_node_uuid", c.NodeUUID())
c.vbProcessingStats.updateVbStat(e.VBucket, "dcp_stream_requested_worker", c.ConsumerName())

c.vbProcessingStats.updateVbStat(e.VBucket, "vb_filter_ack_received", false)
c.vbProcessingStats.updateVbStat(e.VBucket, "failover_log", vbBlob.FailoverLog)

if !c.checkIfCurrentConsumerShouldOwnVb(e.VBucket) {
c.Lock()
c.vbsRemainingToClose = append(c.vbsRemainingToClose, e.VBucket)
c.Unlock()

c.filterVbEventsRWMutex.Lock()
c.filterVbEvents[e.VBucket] = struct{}{}
c.filterVbEventsRWMutex.Unlock()
}

logging.Infof("%s [%s:%s:%d] vb: %d STREAMREQ Inserting entry: %#v to vbFlogChan",
logPrefix, c.workerName, c.tcpPort, c.Pid(), e.VBucket, vbFlog)
c.vbFlogChan <- vbFlog
continue
}

if e.Status == mcd.KEY_EEXISTS {
vbFlog := &vbFlogEntry{statusCode: e.Status, streamReqRetry: false, vb: e.VBucket}

logging.Infof("%s [%s:%s:%d] vb: %d STREAMREQ Inserting entry: %#v to vbFlogChan",
logPrefix, c.workerName, c.tcpPort, c.Pid(), e.VBucket, vbFlog)
c.vbFlogChan <- vbFlog
continue
}

if e.Status != mcd.SUCCESS {
vbFlog := &vbFlogEntry{
flog: e.FailoverLog,
seqNo: e.Seqno,
statusCode: e.Status,
streamReqRetry: true,
vb: e.VBucket,
}

vbKey := fmt.Sprintf("%s::vb::%d", c.app.AppName, e.VBucket)

var operr error
err := util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, addOwnershipHistorySRFCallback,
c, c.producer.AddMetadataPrefix(vbKey), &operr)
if err == common.ErrRetryTimeout {
logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid())
continue
} else if operr == common.ErrEncryptionLevelChanged {
continue
}

logging.Infof("%s [%s:%s:%d] vb: %d STREAMREQ Failed. Inserting entry: %#v to vbFlogChan",
logPrefix, c.workerName, c.tcpPort, c.Pid(), e.VBucket, vbFlog)
c.vbFlogChan <- vbFlog
continue
}
// already processed, no need to push to aggfeed
continue
}

if c.aggDCPFeedMem > c.aggDCPFeedMemCap {
time.Sleep(10 * time.Millisecond)
}
Expand Down
8 changes: 1 addition & 7 deletions consumer/v8_consumer.go
Expand Up @@ -21,12 +21,6 @@ import (
flatbuffers "github.com/google/flatbuffers/go"
)

// Note: Should be a multiple of number of dcpFeeds which we might not know during initialising consumer
// Hence, assuming 8 KV dcpFeeds for an average of 8 KV nodes.
const (
AggChanSizeMultiplier = 8
)

// NewConsumer called by producer to create consumer handle
func NewConsumer(hConfig *common.HandlerConfig, pConfig *common.ProcessConfig, rConfig *common.RebalanceConfig,
index int, uuid, nsServerPort string, eventingNodeUUIDs []string, vbnos []uint16, app *common.AppConfig,
Expand All @@ -40,7 +34,7 @@ func NewConsumer(hConfig *common.HandlerConfig, pConfig *common.ProcessConfig, r
isPausing: false,
languageCompatibility: hConfig.LanguageCompatibility,
app: app,
aggDCPFeed: make(chan *memcached.DcpEvent, (AggChanSizeMultiplier * dcpConfig["dataChanSize"].(int))),
aggDCPFeed: make(chan *memcached.DcpEvent, dcpConfig["dataChanSize"].(int)),
aggDCPFeedMemCap: hConfig.AggDCPFeedMemCap,
breakpadOn: pConfig.BreakpadOn,
sourceKeyspace: hConfig.SourceKeyspace,
Expand Down

0 comments on commit 1632117

Please sign in to comment.