Skip to content

Commit

Permalink
MB-49570: Start dcp stream for a single collection id
Browse files Browse the repository at this point in the history
Change-Id: I62ec456180b69f7b667ef2ad6981f789553a0009
Reviewed-on: https://review.couchbase.org/c/eventing/+/167542
Tested-by: <ankit.prabhu@couchbase.com>
Reviewed-by: Jeelan Basha Poola <jeelan.poola@couchbase.com>
Reviewed-by: CI Bot
  • Loading branch information
AnkitPrabhu committed Dec 17, 2021
1 parent 67ef79f commit 9061db3
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 29 deletions.
5 changes: 5 additions & 0 deletions common/common.go
Expand Up @@ -3,6 +3,7 @@ package common
import (
"crypto/x509"
"errors"
"fmt"
"net"

"github.com/couchbase/cbauth/metakv"
Expand Down Expand Up @@ -623,3 +624,7 @@ var (
const (
CurlFeature uint32 = 1 << iota
)

func Uint32ToHex(uint32Val uint32) string {
return fmt.Sprintf("%x", uint32Val)
}
4 changes: 3 additions & 1 deletion consumer/http_handlers.go
Expand Up @@ -132,7 +132,9 @@ func (c *Consumer) dcpEventsRemainingToProcess() error {
c.statsRWMutex.Unlock()

var seqNos []uint64
err := util.Retry(util.NewFixedBackoff(clusterOpRetryInterval), c.retryCount, util.GetSeqnos, c.producer.NsServerHostPort(), "default", c.sourceKeyspace.BucketName, c.srcCid, &seqNos)
err := util.Retry(util.NewFixedBackoff(clusterOpRetryInterval), c.retryCount, util.GetSeqnos,
c.producer.NsServerHostPort(), "default",
c.sourceKeyspace.BucketName, c.srcCid, &seqNos)
if err != nil {
logging.Errorf("%s [%s:%s:%d] Failed to fetch get_all_vb_seqnos, err: %v",
logPrefix, c.workerName, c.tcpPort, c.Pid(), err)
Expand Down
24 changes: 11 additions & 13 deletions consumer/process_events.go
Expand Up @@ -293,12 +293,12 @@ func (c *Consumer) processDCPEvents() {
}

case mcd.DCP_SYSTEM_EVENT:
c.checkAndSendNoOp(e.Seqno, e.VBucket)
c.SendNoOp(e.Seqno, e.VBucket)
c.vbProcessingStats.updateVbStat(e.VBucket, "last_read_seq_no", e.Seqno)
c.vbProcessingStats.updateVbStat(e.VBucket, "manifest_id", string(e.ManifestUID))

case mcd.DCP_SEQNO_ADVANCED:
c.checkAndSendNoOp(e.Seqno, e.VBucket)
c.SendNoOp(e.Seqno, e.VBucket)
c.vbProcessingStats.updateVbStat(e.VBucket, "last_read_seq_no", e.Seqno)

default:
Expand Down Expand Up @@ -405,7 +405,8 @@ func (c *Consumer) startDcp() error {
var vbSeqnos []uint64
// Fetch high seq number only if dcp stream boundary is from now
if c.dcpStreamBoundary == common.DcpFromNow {
err = util.Retry(util.NewFixedBackoff(clusterOpRetryInterval), c.retryCount, util.GetSeqnos, c.producer.NsServerHostPort(), "default", c.sourceKeyspace.BucketName, c.srcCid, &vbSeqnos)
err = util.Retry(util.NewFixedBackoff(clusterOpRetryInterval), c.retryCount, util.GetSeqnos, c.producer.NsServerHostPort(),
"default", c.sourceKeyspace.BucketName, c.srcCid, &vbSeqnos)
if err != nil && c.dcpStreamBoundary != common.DcpEverything {
logging.Errorf("%s [%s:%s:%d] Failed to fetch vb seqnos, err: %v", logPrefix, c.workerName, c.tcpPort, c.Pid(), err)
return nil
Expand Down Expand Up @@ -878,7 +879,8 @@ func (c *Consumer) dcpRequestStreamHandle(vb uint16, vbBlob *vbucketKVBlob, star
}

c.dcpStreamReqCounter++
err = dcpFeed.DcpRequestStream(vb, opaque, flags, vbBlob.VBuuid, start, end, snapStart, snapEnd, mid)
hexColId := common.Uint32ToHex(c.srcCid)
err = dcpFeed.DcpRequestStream(vb, opaque, flags, vbBlob.VBuuid, start, end, snapStart, snapEnd, mid, hexColId)
if err != nil {
c.dcpStreamReqErrCounter++
logging.Errorf("%s [%s:%s:%d] vb: %d STREAMREQ call failed on dcpFeed: %v, err: %v",
Expand Down Expand Up @@ -1066,7 +1068,9 @@ func (c *Consumer) handleFailoverLog() {
return
default:
var vbSeqNos []uint64
err := util.Retry(util.NewFixedBackoff(clusterOpRetryInterval), c.retryCount, util.GetSeqnos, c.producer.NsServerHostPort(), "default", c.sourceKeyspace.BucketName, c.srcCid, &vbSeqNos)
err := util.Retry(util.NewFixedBackoff(clusterOpRetryInterval), c.retryCount,
util.GetSeqnos, c.producer.NsServerHostPort(), "default",
c.sourceKeyspace.BucketName, c.srcCid, &vbSeqNos)
if err == nil {
break vbLabel
}
Expand Down Expand Up @@ -1165,9 +1169,8 @@ func (c *Consumer) sendEvent(e *cb.DcpEvent) error {
return nil
}

func (c *Consumer) checkAndSendNoOp(seqNo uint64, partition uint16) {
lastSent := c.vbProcessingStats.getVbStat(partition, "last_sent_seq_no").(uint64)
if !c.producer.IsTrapEvent() && (seqNo-lastSent) >= noOpMsgSendThreshold {
func (c *Consumer) SendNoOp(seqNo uint64, partition uint16) {
if !c.producer.IsTrapEvent() {
c.sendNoOpEvent(seqNo, partition)
}
}
Expand Down Expand Up @@ -1413,11 +1416,6 @@ func (c *Consumer) filterMutations(e *cb.DcpEvent) bool {
c.filterVbEventsRWMutex.RUnlock()

c.vbProcessingStats.updateVbStat(e.VBucket, "last_read_seq_no", e.Seqno)
if c.srcCid != e.CollectionID {
c.checkAndSendNoOp(e.Seqno, e.VBucket)
return true
}

return false
}

Expand Down
20 changes: 14 additions & 6 deletions dcp/transport/client/dcp_feed.go
Expand Up @@ -139,12 +139,13 @@ func (feed *DcpFeed) DcpGetSeqnos() (map[uint16]uint64, error) {
// DcpRequestStream for a single vbucket.
func (feed *DcpFeed) DcpRequestStream(vbno, opaqueMSB uint16, flags uint32,
vuuid, startSequence, endSequence, snapStart, snapEnd uint64,
manifestUID string) error {
manifestUID, collectionId string) error {

respch := make(chan []interface{}, 1)
cmd := []interface{}{
dfCmdRequestStream, vbno, opaqueMSB, flags, vuuid,
startSequence, endSequence, snapStart, snapEnd, manifestUID,
startSequence, endSequence, snapStart, snapEnd,
manifestUID, collectionId,
respch}
resp, err := failsafeOp(feed.reqch, respch, cmd, feed.finch)
return opError(err, resp, 0)
Expand Down Expand Up @@ -263,10 +264,13 @@ func (feed *DcpFeed) handleControlRequest(
snapStart, snapEnd := msg[7].(uint64), msg[8].(uint64)

manifestUID := msg[9].(string)
respch := msg[10].(chan []interface{})
collectionId := msg[10].(string)

respch := msg[11].(chan []interface{})
err := feed.doDcpRequestStream(
vbno, opaqueMSB, flags, vuuid,
startSequence, endSequence, snapStart, snapEnd, manifestUID)
startSequence, endSequence, snapStart, snapEnd,
manifestUID, collectionId)
respch <- []interface{}{err}

case dfCmdCloseStream:
Expand Down Expand Up @@ -716,7 +720,7 @@ func (feed *DcpFeed) doControlRequest(opaque uint16, key string, value []byte, r
func (feed *DcpFeed) doDcpRequestStream(
vbno, opaqueMSB uint16, flags uint32,
vuuid, startSequence, endSequence, snapStart, snapEnd uint64,
manifestUID string) error {
manifestUID, collectionId string) error {

rq := &transport.MCRequest{
Opcode: transport.DCP_STREAMREQ,
Expand All @@ -737,6 +741,9 @@ func (feed *DcpFeed) doDcpRequestStream(
requestValue := &StreamRequestValue{}
if feed.collectionAware {
requestValue.ManifestUID = manifestUID
if collectionId != "" {
requestValue.CollectionIDs = []string{collectionId}
}
body, _ := json.Marshal(requestValue)
rq.Body = body
}
Expand Down Expand Up @@ -924,7 +931,8 @@ func vbOpaque(opq32 uint32) uint16 {
}

type StreamRequestValue struct {
ManifestUID string `json:"uid,omitempty"`
ManifestUID string `json:"uid,omitempty"`
CollectionIDs []string `json:"collections,omitempty"`
}

// DcpStream is per stream data structure over an DCP Connection.
Expand Down
20 changes: 13 additions & 7 deletions dcp/upr.go
Expand Up @@ -235,7 +235,8 @@ const (
// Synchronous call.
func (feed *DcpFeed) DcpRequestStream(
vb uint16, opaque uint16, flags uint32,
vbuuid, startSequence, endSequence, snapStart, snapEnd uint64, manifestUID string) error {
vbuuid, startSequence, endSequence, snapStart, snapEnd uint64,
manifestUID, collectionId string) error {

// only request active vbucket
if feed.activeVbOnly {
Expand All @@ -244,8 +245,9 @@ func (feed *DcpFeed) DcpRequestStream(

respch := make(chan []interface{}, 1)
cmd := []interface{}{
ufCmdRequestStream, vb, opaque, flags, vbuuid, startSequence,
endSequence, snapStart, snapEnd, manifestUID, respch}
ufCmdRequestStream, vb, opaque, flags, vbuuid,
startSequence, endSequence, snapStart, snapEnd,
manifestUID, collectionId, respch}
resp, err := failsafeOp(feed.reqch, respch, cmd, feed.finch)
return opError(err, resp, 0)
}
Expand Down Expand Up @@ -319,10 +321,13 @@ loop:
startSeq, endSeq := msg[5].(uint64), msg[6].(uint64)
snapStart, snapEnd := msg[7].(uint64), msg[8].(uint64)
manifestUID := msg[9].(string)
collectionId := msg[10].(string)

err := feed.dcpRequestStream(
vb, opaque, flags, vbuuid, startSeq, endSeq,
snapStart, snapEnd, manifestUID)
respch := msg[10].(chan []interface{})
snapStart, snapEnd,
manifestUID, collectionId)
respch := msg[11].(chan []interface{})
respch <- []interface{}{err}

case ufCmdCloseStream:
Expand Down Expand Up @@ -444,7 +449,8 @@ func (feed *DcpFeed) reConnectToNodes(

func (feed *DcpFeed) dcpRequestStream(
vb uint16, opaque uint16, flags uint32,
vbuuid, startSequence, endSequence, snapStart, snapEnd uint64, manifestUID string) error {
vbuuid, startSequence, endSequence, snapStart, snapEnd uint64,
manifestUID, collectionId string) error {

prefix := feed.logPrefix
vbm := feed.bucket.VBServerMap()
Expand Down Expand Up @@ -483,7 +489,7 @@ func (feed *DcpFeed) dcpRequestStream(
}
err = singleFeed.dcpFeed.DcpRequestStream(
vb, opaque, flags, vbuuid, startSequence, endSequence,
snapStart, snapEnd, manifestUID)
snapStart, snapEnd, manifestUID, collectionId)
if err != nil {
fmsg := "%v ##%x DcpFeed %v failed, trying next"
logging.Errorf(fmsg, prefix, opaque, singleFeed.dcpFeed.Name())
Expand Down
4 changes: 3 additions & 1 deletion producer/bucket_ops.go
Expand Up @@ -209,9 +209,11 @@ var openDcpStreamFromZero = func(args ...interface{}) error {
p := args[3].(*Producer)
id := args[4].(int)
keyspaceExist := args[5].(*bool)
metaCid := p.GetMetadataCid()
hexCid := common.Uint32ToHex(metaCid)

err := dcpFeed.DcpRequestStream(vb, uint16(vb), uint32(0), vbuuid, uint64(0),
uint64(0xFFFFFFFFFFFFFFFF), uint64(0), uint64(0xFFFFFFFFFFFFFFFF), "0")
uint64(0xFFFFFFFFFFFFFFFF), uint64(0), uint64(0xFFFFFFFFFFFFFFFF), "0", hexCid)
if err != nil {
logging.Errorf("%s [%s:%d:id_%d] vb: %d failed to request stream error: %v",
logPrefix, p.appName, p.LenRunningConsumers(), id, vb, err)
Expand Down
4 changes: 3 additions & 1 deletion producer/producer.go
Expand Up @@ -139,7 +139,9 @@ func (p *Producer) Serve() {
}
}

_, srcCid, err := p.superSup.GetScopeAndCollectionID(p.handlerConfig.SourceKeyspace.BucketName, p.handlerConfig.SourceKeyspace.ScopeName, p.handlerConfig.SourceKeyspace.CollectionName)
_, srcCid, err := p.superSup.GetScopeAndCollectionID(p.handlerConfig.SourceKeyspace.BucketName,
p.handlerConfig.SourceKeyspace.ScopeName,
p.handlerConfig.SourceKeyspace.CollectionName)
if err == common.BucketNotWatched || err == collections.SCOPE_NOT_FOUND || err == collections.COLLECTION_NOT_FOUND {
p.undeployHandler <- false
logging.Errorf("%s [%s] source scope or collection not found %v", logPrefix, p.appName, err)
Expand Down

0 comments on commit 9061db3

Please sign in to comment.