Skip to content

Commit

Permalink
MB-41410 Flush feed's backch before processing reqch messages
Browse files Browse the repository at this point in the history
When there are back-to-back MTR's and delBuckets requests from indexer,
backCh can easily get filled up and lead to a deadlock. By flushing
the backCh before processing a request, we make sure that backch has
enough space to process messages from new requests. This should not
have any functional impact as this flush can be considered as a
special case where feed always selects the backCh over reqCh

Change-Id: I79b053ac924d9658229af523bb1bab07489ca275
  • Loading branch information
varunv-cb committed Sep 17, 2020
1 parent 8b6bcf7 commit 3d51e8c
Showing 1 changed file with 175 additions and 159 deletions.
334 changes: 175 additions & 159 deletions secondary/projector/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,182 +498,198 @@ func (feed *Feed) genServer() {
timeout.Stop()
}()

loop:
for {
select {
case msg = <-feed.reqch:
switch feed.handleCommand(msg) {
case "ok":
feed.stale = 0
case "exit":
break loop
}

case msg = <-feed.backch:
if cmd, ok := msg[0].(*controlStreamRequest); ok {
// This check is required to avoid race in the following scenario:
//
// 1. Indexer sends fCmdStart, feed opens connections with DCP upstream
// and starts sending DCP_STREAMREQ messages for each of the vb's
// 2. Due to a slow memcached, the streamreq message for one vb timesout
// 3. DCPP sends stream begin messages for successful DCP_STREAMREQ
// messages and kvdata puts them in feed.backch
// 4. Due to the timeout error while starting dcp streams, feed.cleanupKeyspace()
// is invoked
// 5. This would clean-up the KVData instance and indexer sends MTR again
// 6. In this MTR, the stream begin messages from earlier MTR would not be
// processed (in feed.waitStreamRequests()) as the opaque values differ
// and these stream begin messages will be read here
//
// As uuid's would be different for different instances of kvdata, we compare the
// uuid's before actually trying to process the STREAM_BEGIN messages. If there is
// a mismatch, we ignore the message

if kvdata, ok := feed.kvdata[cmd.keyspaceId]; ok {
if cmd.uuid != kvdata.uuid {
logging.Infof("%v The kvdata instance: %v for keyspace: '%v' is already cleaned up."+
" Current kvdata instance is: %v. Ignoring controlStreamRequest for vb:%v",
feed.logPrefix, cmd.uuid, cmd.keyspaceId, kvdata.uuid, cmd.vbno)
continue
}
} else { // KVData instance does not exist. Ignore the message
logging.Infof("%v The kvdata instance for keyspace: '%v' does not exist."+
" Ignoring controlStreamRequest for vb:%v", feed.logPrefix, cmd.keyspaceId, cmd.vbno)
continue
}

var reqTs *protobuf.TsVbuuid
if reqTs, ok = feed.reqTss[cmd.keyspaceId]; !ok {
fmsg := "%v ##%x ignoring backch message %T: %v\n"
logging.Warnf(fmsg, prefix, cmd.opaque, cmd, cmd.Repr())
continue
}

seqno, _, sStart, sEnd, _, err := reqTs.Get(cmd.vbno)
if err != nil {
fmsg := "%v ##%x backch flush %v: %v\n"
logging.Errorf(fmsg, prefix, cmd.opaque, cmd, err)
}
if ok && reqTs != nil {
reqTs = reqTs.FilterByVbuckets([]uint16{cmd.vbno})
feed.reqTss[cmd.keyspaceId] = reqTs
}
if cmd.status == mcd.ROLLBACK {
fmsg := "%v ##%x backch flush rollback %T: %v\n"
logging.Infof(fmsg, prefix, cmd.opaque, cmd, cmd.Repr())
rollTs, ok := feed.rollTss[cmd.keyspaceId]
if ok {
rollTs = rollTs.Append(
cmd.vbno, cmd.seqno, cmd.vbuuid, sStart, sEnd, "")
feed.rollTss[cmd.keyspaceId] = rollTs
}

} else if cmd.status == mcd.SUCCESS {
fmsg := "%v ##%x backch flush success %T: %v\n"
logging.Infof(fmsg, prefix, cmd.opaque, cmd, cmd.Repr())
actTs, ok := feed.actTss[cmd.keyspaceId]
if ok {
actTs = actTs.Append(
cmd.vbno, seqno, cmd.vbuuid, sStart, sEnd, "")
feed.actTss[cmd.keyspaceId] = actTs
}

} else {
fmsg := "%v ##%x backch flush error %T: %v\n"
logging.Errorf(fmsg, prefix, cmd.opaque, cmd, cmd.Repr())
handleBackChMsgs := func(msg []interface{}) {
if cmd, ok := msg[0].(*controlStreamRequest); ok {
// This check is required to avoid race in the following scenario:
//
// 1. Indexer sends fCmdStart, feed opens connections with DCP upstream
// and starts sending DCP_STREAMREQ messages for each of the vb's
// 2. Due to a slow memcached, the streamreq message for one vb timesout
// 3. DCPP sends stream begin messages for successful DCP_STREAMREQ
// messages and kvdata puts them in feed.backch
// 4. Due to the timeout error while starting dcp streams, feed.cleanupKeyspace()
// is invoked
// 5. This would clean-up the KVData instance and indexer sends MTR again
// 6. In this MTR, the stream begin messages from earlier MTR would not be
// processed (in feed.waitStreamRequests()) as the opaque values differ
// and these stream begin messages will be read here
//
// As uuid's would be different for different instances of kvdata, we compare the
// uuid's before actually trying to process the STREAM_BEGIN messages. If there is
// a mismatch, we ignore the message

if kvdata, ok := feed.kvdata[cmd.keyspaceId]; ok {
if cmd.uuid != kvdata.uuid {
logging.Infof("%v The kvdata instance: %v for keyspace: '%v' is already cleaned up."+
" Current kvdata instance is: %v. Ignoring controlStreamRequest for vb:%v",
feed.logPrefix, cmd.uuid, cmd.keyspaceId, kvdata.uuid, cmd.vbno)
return
}
} else { // KVData instance does not exist. Ignore the message
logging.Infof("%v The kvdata instance for keyspace: '%v' does not exist."+
" Ignoring controlStreamRequest for vb:%v", feed.logPrefix, cmd.keyspaceId, cmd.vbno)
return
}

} else if cmd, ok := msg[0].(*controlStreamEnd); ok {
// This check is required to avoid race in the following scenario:
// 1. When feed.cleanupKeyspace() is triggerred, it invokes feeder.closeFeed()
// 2. feeder.CloseFeed() would publish STREAMEND messages
// 3. If kvdata reads the STREAMEND messages before closing, it would publish
// the STREAMEND messages to feed.backch
// 4. After feed.cleanupKeyspace() exits, feed processes messages on reqch and backch
// 5. If an fCmdStart from indexer arrives on feed's reqch and it is processed before
// messages on backch, then at the time STREAMEND messages from backch are processed
// the book-keeping for vbuckets goes out of sync with KV engine
//
// As uuid's would be different for different instances of kvdata, we compare the
// uuid's before actually trying to process the STREAM_END messages. If there is
// a mismatch, we ignore the message
if kvdata, ok := feed.kvdata[cmd.keyspaceId]; ok {
if cmd.uuid != kvdata.uuid {
logging.Warnf("%v The kvdata instance: %v for keyspace: '%v' is already cleaned up."+
" Current kvdata instance is: %v. Ignoring controlStreamEnd for vb:%v",
feed.logPrefix, cmd.uuid, cmd.keyspaceId, kvdata.uuid, cmd.vbno)
continue
}
} else { // KVData instance does not exist. Ignore the message
logging.Infof("%v The kvdata instance for keyspace: '%v' does not exist."+
" Ignoring controlStreamEnd for vb:%v", feed.logPrefix, cmd.keyspaceId, cmd.vbno)
continue
}
var reqTs *protobuf.TsVbuuid
if reqTs, ok = feed.reqTss[cmd.keyspaceId]; !ok {
fmsg := "%v ##%x ignoring backch message %T: %v\n"
logging.Warnf(fmsg, prefix, cmd.opaque, cmd, cmd.Repr())
return
}

fmsg := "%v ##%x backch flush %T: %v\n"
seqno, _, sStart, sEnd, _, err := reqTs.Get(cmd.vbno)
if err != nil {
fmsg := "%v ##%x backch flush %v: %v\n"
logging.Errorf(fmsg, prefix, cmd.opaque, cmd, err)
}
if ok && reqTs != nil {
reqTs = reqTs.FilterByVbuckets([]uint16{cmd.vbno})
feed.reqTss[cmd.keyspaceId] = reqTs
}
if cmd.status == mcd.ROLLBACK {
fmsg := "%v ##%x backch flush rollback %T: %v\n"
logging.Infof(fmsg, prefix, cmd.opaque, cmd, cmd.Repr())
reqTs, ok := feed.reqTss[cmd.keyspaceId]
rollTs, ok := feed.rollTss[cmd.keyspaceId]
if ok {
reqTs = reqTs.FilterByVbuckets([]uint16{cmd.vbno})
feed.reqTss[cmd.keyspaceId] = reqTs
rollTs = rollTs.Append(
cmd.vbno, cmd.seqno, cmd.vbuuid, sStart, sEnd, "")
feed.rollTss[cmd.keyspaceId] = rollTs
}

} else if cmd.status == mcd.SUCCESS {
fmsg := "%v ##%x backch flush success %T: %v\n"
logging.Infof(fmsg, prefix, cmd.opaque, cmd, cmd.Repr())
actTs, ok := feed.actTss[cmd.keyspaceId]
if ok {
actTs = actTs.FilterByVbuckets([]uint16{cmd.vbno})
actTs = actTs.Append(
cmd.vbno, seqno, cmd.vbuuid, sStart, sEnd, "")
feed.actTss[cmd.keyspaceId] = actTs
}
rollTs, ok := feed.rollTss[cmd.keyspaceId]
if ok {
rollTs = rollTs.FilterByVbuckets([]uint16{cmd.vbno})
feed.rollTss[cmd.keyspaceId] = rollTs
}

} else if cmd, ok := msg[0].(*controlFinKVData); ok {
// This check is required to avoid race in the following scenario:
// 1. When feed.cleanupKeyspace() is triggerred, it invokes kvdata.Close()
// 2. kvdata.Close() will stop the genServer() and posts response back to feed.
// The defer() block execution in kvdata's genServer() is yet to happen
// 3. feed.cleanupKeyspace() will go-ahead delete the kvdata entry for the bucket
// 4. feed now waits on reqch and backch for messages
// 5. fCmdStart from indexer arrives on feed and feed initializes kvdata for the bucket
// 6. The defer() block from kvdata that got closed starts it's execution and it posts
// controlFinKVData message
// 7. This message would cleanup the bucket again which got initialized with fCmdStart
//
// As uuid's would be different for different instances of kvdata, we compare the
// uuid's before actually trying to process the controlFinKVData message. If there is
// a mismatch, we ignore the message
if kvdata, ok := feed.kvdata[cmd.keyspaceId]; ok {
if cmd.uuid != kvdata.uuid {
logging.Warnf("%v The kvdata instance: %v for keyspace: '%v' is already cleaned up."+
" Current kvdata instance is: %v. Ignoring controlFinKVData",
feed.logPrefix, cmd.uuid, cmd.keyspaceId, kvdata.uuid)
continue
}
} else { // KVData instance does not exist. Ignore the message
logging.Infof("%v The kvdata instance for keyspace: %v does not exist."+
" Ignoring controlFinKVData message", feed.logPrefix, cmd.keyspaceId)
continue
} else {
fmsg := "%v ##%x backch flush error %T: %v\n"
logging.Errorf(fmsg, prefix, cmd.opaque, cmd, cmd.Repr())
}

} else if cmd, ok := msg[0].(*controlStreamEnd); ok {
// This check is required to avoid race in the following scenario:
// 1. When feed.cleanupKeyspace() is triggerred, it invokes feeder.closeFeed()
// 2. feeder.CloseFeed() would publish STREAMEND messages
// 3. If kvdata reads the STREAMEND messages before closing, it would publish
// the STREAMEND messages to feed.backch
// 4. After feed.cleanupKeyspace() exits, feed processes messages on reqch and backch
// 5. If an fCmdStart from indexer arrives on feed's reqch and it is processed before
// messages on backch, then at the time STREAMEND messages from backch are processed
// the book-keeping for vbuckets goes out of sync with KV engine
//
// As uuid's would be different for different instances of kvdata, we compare the
// uuid's before actually trying to process the STREAM_END messages. If there is
// a mismatch, we ignore the message
if kvdata, ok := feed.kvdata[cmd.keyspaceId]; ok {
if cmd.uuid != kvdata.uuid {
logging.Warnf("%v The kvdata instance: %v for keyspace: '%v' is already cleaned up."+
" Current kvdata instance is: %v. Ignoring controlStreamEnd for vb:%v",
feed.logPrefix, cmd.uuid, cmd.keyspaceId, kvdata.uuid, cmd.vbno)
return
}
} else { // KVData instance does not exist. Ignore the message
logging.Infof("%v The kvdata instance for keyspace: '%v' does not exist."+
" Ignoring controlStreamEnd for vb:%v", feed.logPrefix, cmd.keyspaceId, cmd.vbno)
return
}

fmsg := "%v ##%x backch flush %T -- %v\n"
logging.Infof(fmsg, prefix, feed.opaque, cmd, cmd.Repr())
_, ok := feed.actTss[cmd.keyspaceId]
if ok == false {
// Note: bucket could have gone because of a downstream
// delBucket() request.
fmsg := "%v ##%x FinKVData can't find keyspace %q\n"
logging.Warnf(fmsg, prefix, feed.opaque, cmd.keyspaceId)
fmsg := "%v ##%x backch flush %T: %v\n"
logging.Infof(fmsg, prefix, cmd.opaque, cmd, cmd.Repr())
reqTs, ok := feed.reqTss[cmd.keyspaceId]
if ok {
reqTs = reqTs.FilterByVbuckets([]uint16{cmd.vbno})
feed.reqTss[cmd.keyspaceId] = reqTs
}
actTs, ok := feed.actTss[cmd.keyspaceId]
if ok {
actTs = actTs.FilterByVbuckets([]uint16{cmd.vbno})
feed.actTss[cmd.keyspaceId] = actTs
}
rollTs, ok := feed.rollTss[cmd.keyspaceId]
if ok {
rollTs = rollTs.FilterByVbuckets([]uint16{cmd.vbno})
feed.rollTss[cmd.keyspaceId] = rollTs
}

} else if cmd, ok := msg[0].(*controlFinKVData); ok {
// This check is required to avoid race in the following scenario:
// 1. When feed.cleanupKeyspace() is triggerred, it invokes kvdata.Close()
// 2. kvdata.Close() will stop the genServer() and posts response back to feed.
// The defer() block execution in kvdata's genServer() is yet to happen
// 3. feed.cleanupKeyspace() will go-ahead delete the kvdata entry for the bucket
// 4. feed now waits on reqch and backch for messages
// 5. fCmdStart from indexer arrives on feed and feed initializes kvdata for the bucket
// 6. The defer() block from kvdata that got closed starts it's execution and it posts
// controlFinKVData message
// 7. This message would cleanup the bucket again which got initialized with fCmdStart
//
// As uuid's would be different for different instances of kvdata, we compare the
// uuid's before actually trying to process the controlFinKVData message. If there is
// a mismatch, we ignore the message
if kvdata, ok := feed.kvdata[cmd.keyspaceId]; ok {
if cmd.uuid != kvdata.uuid {
logging.Warnf("%v The kvdata instance: %v for keyspace: '%v' is already cleaned up."+
" Current kvdata instance is: %v. Ignoring controlFinKVData",
feed.logPrefix, cmd.uuid, cmd.keyspaceId, kvdata.uuid)
return
}
fmsg = "%v ##%x self deleting keyspace\n"
logging.Infof(fmsg, prefix, feed.opaque)
feed.cleanupKeyspace(cmd.keyspaceId, false)
} else { // KVData instance does not exist. Ignore the message
logging.Infof("%v The kvdata instance for keyspace: %v does not exist."+
" Ignoring controlFinKVData message", feed.logPrefix, cmd.keyspaceId)
return
}

} else {
fmsg := "%v ##%x backch flush %T: %v\n"
logging.Fatalf(fmsg, prefix, feed.opaque, msg[0], msg[0])
fmsg := "%v ##%x backch flush %T -- %v\n"
logging.Infof(fmsg, prefix, feed.opaque, cmd, cmd.Repr())
_, ok := feed.actTss[cmd.keyspaceId]
if ok == false {
// Note: bucket could have gone because of a downstream
// delBucket() request.
fmsg := "%v ##%x FinKVData can't find keyspace %q\n"
logging.Warnf(fmsg, prefix, feed.opaque, cmd.keyspaceId)
}
fmsg = "%v ##%x self deleting keyspace\n"
logging.Infof(fmsg, prefix, feed.opaque)
feed.cleanupKeyspace(cmd.keyspaceId, false)

} else {
fmsg := "%v ##%x backch flush %T: %v\n"
logging.Fatalf(fmsg, prefix, feed.opaque, msg[0], msg[0])
}
}

loop:
for {
// Flush backch messages before processing request
// While processng a backCh message, we compare the UUID of the current
// KVData instance with that of the UUID in the message. If there is a
// mismatch, we ignore the message in backCh. This makes it safer to
// flush the backCh messages before processing a request
for len(feed.backch) > 0 {
select {
case msg = <-feed.backch:
handleBackChMsgs(msg)
default:
}
}

select {
case msg = <-feed.reqch:
switch feed.handleCommand(msg) {
case "ok":
feed.stale = 0
case "exit":
break loop
}

case msg = <-feed.backch:
handleBackChMsgs(msg)
case <-timeout.C:
if len(feed.backch) > 0 { // can happend during rebalance.
logging.Warnf(ctrlMsg, prefix, feed.opaque, len(feed.backch))
Expand Down

0 comments on commit 3d51e8c

Please sign in to comment.