Skip to content

Commit

Permalink
MB-46348 Log seqnos of all control messages that are drained in clean…
Browse files Browse the repository at this point in the history
…-up path

This would help to easily catch any seqno order violations due to the
race between clean-up path and data path

Change-Id: I002c3bbc99730fd70b3b2a6620259ebb12724051
  • Loading branch information
varunv-cb committed May 21, 2021
1 parent 709e493 commit e1e7fca
Showing 1 changed file with 34 additions and 1 deletion.
35 changes: 34 additions & 1 deletion secondary/projector/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -1597,14 +1597,47 @@ func (feed *Feed) cleanupKeyspace(keyspaceId string, enginesOk bool) {
// drain the .C channel until it gets closed or if this feed
// happends to get closed.
go func(C <-chan *mc.DcpEvent, finch chan bool) {
// List of all snapshot messages (per vbucket) that are drained in this go-routine
snapshotMsgs := make(map[uint16][][2]uint64)
// List of all seqnoAdvanced messages (per vbucket) that are drained in this go-routine
seqnoAdvancedMsgs := make(map[uint16][]uint64)
// List of all systemEvent messages (per vbucket) that are drained in this go-routine
systemEventMsgs := make(map[uint16][]uint64)

defer func() {
for vb, seqnoAdvancedList := range seqnoAdvancedMsgs {
fmsg := "%v ##%x SeqnoAdvanced for vb: %v, keyspaceId: '%v' is drained in clean-up path. List of Seqnos drained: %v"
logging.Warnf(fmsg, feed.logPrefix, feed.opaque, vb, keyspaceId, seqnoAdvancedList)
}

for vb, systemEventSeqnosList := range systemEventMsgs {
fmsg := "%v ##%x SystemEvent for vb: %v, keyspaceId: '%v' is drained in clean-up path. List of Seqnos drained: %v"
logging.Warnf(fmsg, feed.logPrefix, feed.opaque, vb, keyspaceId, systemEventSeqnosList)
}

for vb, snapshotMsgList := range snapshotMsgs {
fmsg := "%v ##%x Snapshot for vb: %v, keyspaceId: '%v' is drained in clean-up path. List of Snapshots drained: %v"
logging.Warnf(fmsg, feed.logPrefix, feed.opaque, vb, keyspaceId, snapshotMsgList)
}
}()

for {
select {
case m, ok := <-C:
if ok == false {
return
} else if m.Opcode == mcd.DCP_STREAMREQ {
}
switch m.Opcode {
case mcd.DCP_STREAMREQ:
fmsg := "%v ##%x DCP_STREAMREQ for vb:%d, keyspaceId: '%v' is drained in clean-up path"
logging.Errorf(fmsg, feed.logPrefix, m.Opaque, m.VBucket, keyspaceId)
case mcd.DCP_SEQNO_ADVANCED:
seqnoAdvancedMsgs[m.VBucket] = append(seqnoAdvancedMsgs[m.VBucket], m.Seqno)
case mcd.DCP_SYSTEM_EVENT:
systemEventMsgs[m.VBucket] = append(systemEventMsgs[m.VBucket], m.Seqno)
case mcd.DCP_SNAPSHOT:
ss := [2]uint64{m.SnapstartSeq, m.SnapendSeq}
snapshotMsgs[m.VBucket] = append(snapshotMsgs[m.VBucket], ss)
}
case <-finch:
return
Expand Down

0 comments on commit e1e7fca

Please sign in to comment.