Skip to content

Commit

Permalink
MB-33849 Split control and data processing to different go-routines i…
Browse files Browse the repository at this point in the history
…n kvdata

The control and data processing is split into two go-routines with
this patch. genServer() handles the control path and runScatter()
handles the datapath. Between the two go-routines only reqTs is
shared. runScatter() uses reqTs to update Seqno incase of stream
begin messages. As stream begins are not that often, reqTs is
protected with reqTs mutex

As there are two separate go-routines, there is no ordering in the
way the routines can terminate. To make sure both the go-routines
exit gracefully, the following channels have been introduced:

a. genServerFinCh - Controls the execution of genServer() routine.
genServer() will exit when this channel is closed

b. runScatterFinCh - Controls the execution of runScatter() routine.
runScatter() will exit when this channel is closed

c. genServerStopCh - This channel is used to block all incoming
requests reaching KVdata. Incoming requests will error out with
ErrorClosed when this channel is closed

The semantics followed are:

a. When runScatter() exits, then close genServerFinCh so that
genServer() terminates (In case runScatter() exists first, closing
genServerFinCh ensures that genServer() will eventually exit)

b. If genServer() exits, then
    1. Close genServerStopCh to block all incoming requests to
       this kvdata instance
    2. Close runScatterFinCh so that runScatter() terminates and
       data processing is stopped for this kvdata instance
    3. Publish StreamEnd to feed to clear book-keeping and close
       all workers

Change-Id: Ia619b417d2d1dcaa3acfa874a5e5dca918b7a34d
  • Loading branch information
varunv-cb committed Mar 5, 2021
1 parent c4ac8a6 commit 69e0f6f
Showing 1 changed file with 109 additions and 54 deletions.
163 changes: 109 additions & 54 deletions secondary/projector/kvdata.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
// data-path concurrency model:
//
// back-channel
// feed <---------------------* NewKVData()
// StreamRequest | | *---> worker
// StreamEnd | (spawn) |
// | | *---> worker
// | | |
// AddEngines() --*-----> runScatter ---------*---> worker
// Mutations (dcp_feed)
// back-channel |
// feed <---------------------* NewKVData() |
// StreamRequest | |____________ |
// StreamEnd | | | | *---> worker
// | (spawn) (spawn) | |
// | | | | *---> worker
// | | | V |
// AddEngines() --*-----> genServer runScatter-------*---> worker
// |
// DeleteEngines() --*
// |
// GetStatistics() --*
// |
// Close() --*
//

package projector

Expand All @@ -21,6 +23,7 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -57,6 +60,12 @@ type KVData struct {
sbch chan []interface{}
finch chan bool
stopScatter uint32

reqTsMutex *sync.RWMutex
// Closing genServerStopCh will stop all incoming requests to the control path
genServerStopCh chan bool
genServerFinCh chan bool
runScatterFinCh chan bool
// misc.
syncTimeout time.Duration // in milliseconds
logPrefix string
Expand Down Expand Up @@ -227,8 +236,13 @@ func NewKVData(
endpoints: make(map[string]c.RouterEndpoint),
// 16 is enough, there can't be more than that many out-standing
// control calls on this feed.
sbch: make(chan []interface{}, 16),
finch: make(chan bool),
sbch: make(chan []interface{}, 16),

reqTsMutex: &sync.RWMutex{},
genServerStopCh: make(chan bool),
genServerFinCh: make(chan bool),
runScatterFinCh: make(chan bool),

stats: &KvdataStats{},
kvaddr: kvaddr,
async: async,
Expand Down Expand Up @@ -263,6 +277,7 @@ func NewKVData(
// Gather stats pointers from all workers
kvdata.updateWorkerStats()

go kvdata.genServer(reqTs)
go kvdata.runScatter(reqTs, mutch)
logging.Infof("%v ##%x started, uuid: %v ...\n", kvdata.logPrefix, opaque, kvdata.uuid)
return kvdata, nil
Expand Down Expand Up @@ -293,7 +308,7 @@ func (kvdata *KVData) AddEngines(

respch := make(chan []interface{}, 1)
cmd := []interface{}{kvCmdAddEngines, opaque, engines, eps, respch}
resp, err := c.FailsafeOp(kvdata.sbch, respch, cmd, kvdata.finch)
resp, err := c.FailsafeOp(kvdata.sbch, respch, cmd, kvdata.genServerStopCh)
if err = c.OpError(err, resp, 1); err != nil {
return nil, err
}
Expand All @@ -304,47 +319,47 @@ func (kvdata *KVData) AddEngines(
func (kvdata *KVData) DeleteEngines(opaque uint16, engineKeys []uint64, collectionIds []uint32) error {
respch := make(chan []interface{}, 1)
cmd := []interface{}{kvCmdDelEngines, opaque, engineKeys, collectionIds, respch}
_, err := c.FailsafeOp(kvdata.sbch, respch, cmd, kvdata.finch)
_, err := c.FailsafeOp(kvdata.sbch, respch, cmd, kvdata.genServerStopCh)
return err
}

// UpdateTs with new set of {vbno,seqno}, synchronous call.
func (kvdata *KVData) UpdateTs(opaque uint16, ts *protobuf.TsVbuuid) error {
respch := make(chan []interface{}, 1)
cmd := []interface{}{kvCmdTs, opaque, ts, respch}
_, err := c.FailsafeOp(kvdata.sbch, respch, cmd, kvdata.finch)
_, err := c.FailsafeOp(kvdata.sbch, respch, cmd, kvdata.genServerStopCh)
return err
}

// GetStatistics from kv data path, synchronous call.
func (kvdata *KVData) GetStatistics() map[string]interface{} {
respch := make(chan []interface{}, 1)
cmd := []interface{}{kvCmdGetStats, respch}
resp, _ := c.FailsafeOp(kvdata.sbch, respch, cmd, kvdata.finch)
resp, _ := c.FailsafeOp(kvdata.sbch, respch, cmd, kvdata.genServerStopCh)
return resp[0].(map[string]interface{})
}

// ResetConfig for kvdata.
func (kvdata *KVData) ResetConfig(config c.Config) error {
respch := make(chan []interface{}, 1)
cmd := []interface{}{kvCmdResetConfig, config, respch}
_, err := c.FailsafeOp(kvdata.sbch, respch, cmd, kvdata.finch)
_, err := c.FailsafeOp(kvdata.sbch, respch, cmd, kvdata.genServerStopCh)
return err
}

// ReloadHeartbeat for kvdata.
func (kvdata *KVData) ReloadHeartbeat() error {
respch := make(chan []interface{}, 1)
cmd := []interface{}{kvCmdReloadHeartBeat, respch}
_, err := c.FailsafeOp(kvdata.sbch, respch, cmd, kvdata.finch)
_, err := c.FailsafeOp(kvdata.sbch, respch, cmd, kvdata.genServerStopCh)
return err
}

// Close kvdata kv data path, synchronous call.
func (kvdata *KVData) Close() error {
respch := make(chan []interface{}, 1)
cmd := []interface{}{kvCmdClose, respch}
_, err := c.FailsafeOp(kvdata.sbch, respch, cmd, kvdata.finch)
_, err := c.FailsafeOp(kvdata.sbch, respch, cmd, kvdata.genServerStopCh)
return err
}

Expand Down Expand Up @@ -392,18 +407,65 @@ func (kvdata *KVData) runScatter(
logging.Errorf(fmsg, kvdata.logPrefix, kvdata.opaque, r)
logging.Errorf("%s", logging.StackTrace())
}

// Close genServerFinCh to terminate genServer() incase runScatter() exits first
close(kvdata.genServerFinCh)

logging.Infof("%v ##%x runScatter() ... stopped\n", kvdata.logPrefix, kvdata.opaque)
}()

loop:
for {
select {
case m, ok := <-mutch:
if ok == false || atomic.LoadUint32(&kvdata.stopScatter) == 1 { // upstream has closed
break loop
}
kvdata.stats.eventCount.Add(1)
seqno, err := kvdata.scatterMutation(m, ts)
if err != nil {
fmsg := "%v ##%x Error during scatter mutation while posting: %v, err: %v"
logging.Errorf(fmsg, kvdata.logPrefix, kvdata.opaque, m.Opcode, err)
break loop
}
kvdata.stats.vbseqnos[m.VBucket].Set(uint64(seqno))

// Incase genServer() terminates first, it will close runScatterFinCh to
// terminate runScatter() go-routine
case <-kvdata.runScatterFinCh:
break loop
}
}
}

func (kvdata *KVData) genServer(reqTs *protobuf.TsVbuuid) {

defer func() {
if r := recover(); r != nil {
fmsg := "%v ##%x genServer() crashed: %v\n"
logging.Errorf(fmsg, kvdata.logPrefix, kvdata.opaque, r)
logging.Errorf("%s", logging.StackTrace())
}

// Close genServerStopCh so that any syncronous control message waiting
// for response will return
close(kvdata.genServerStopCh)

// Close runScatterFinCh to stop the datapath
close(kvdata.runScatterFinCh)

kvdata.publishStreamEnd()
// shutdown workers
for _, worker := range kvdata.workers {
worker.Close()
}
kvdata.workers = nil
kvdata.feed.PostFinKVdata(kvdata.keyspaceId, kvdata.uuid)
close(kvdata.finch)

//Update closed in stats object and log the stats before exiting
kvdata.stats.closed.Set(true)
kvdata.logStats()
logging.Infof("%v ##%x ... stopped\n", kvdata.logPrefix, kvdata.opaque)
logging.Infof("%v ##%x genServer()... stopped\n", kvdata.logPrefix, kvdata.opaque)
}()

kvdata.heartBeat = time.After(kvdata.syncTimeout)
Expand All @@ -412,24 +474,11 @@ func (kvdata *KVData) runScatter(

loop:
for {
// Prioritize control channel over other channels
select {
case msg := <-kvdata.sbch:
if breakloop := kvdata.handleCommand(msg, ts); breakloop {
if breakloop := kvdata.handleCommand(msg, reqTs); breakloop {
break loop
}
default:
}

select {
case m, ok := <-mutch:
// upstream has closed (or) feed is cleaning up keyspace
if ok == false || atomic.LoadUint32(&kvdata.stopScatter) == 1 {
break loop
}
kvdata.stats.eventCount.Add(1)
seqno, _ := kvdata.scatterMutation(m, ts)
kvdata.stats.vbseqnos[m.VBucket].Set(uint64(seqno))

case <-kvdata.heartBeat:
kvdata.heartBeat = nil
Expand All @@ -449,10 +498,10 @@ loop:
}
}()

case msg := <-kvdata.sbch:
if breakloop := kvdata.handleCommand(msg, ts); breakloop {
break loop
}
// Incase runScatter() terminates first, it will close genServerFinCh
// to terminate genServer()
case <-kvdata.genServerFinCh:
break loop
}
}
}
Expand Down Expand Up @@ -517,7 +566,9 @@ func (kvdata *KVData) handleCommand(msg []interface{}, ts *protobuf.TsVbuuid) bo

case kvCmdTs:
_ /*opaque*/ = msg[1].(uint16)
kvdata.reqTsMutex.Lock()
ts = ts.Union(msg[2].(*protobuf.TsVbuuid))
kvdata.reqTsMutex.Unlock()
respch := msg[3].(chan []interface{})
kvdata.stats.tsCount.Add(1)
respch <- []interface{}{nil}
Expand Down Expand Up @@ -593,8 +644,8 @@ func (kvdata *KVData) scatterMutation(
logging.Infof(fmsg, kvdata.logPrefix, m.Opaque, arg1)

if kvdata.async {
if err := worker.Event(m); err != nil {
panic(err)
if err = worker.Event(m); err != nil {
return
}
}

Expand All @@ -604,15 +655,19 @@ func (kvdata *KVData) scatterMutation(
logging.Errorf(fmsg, kvdata.logPrefix, m.Opaque, m.Status, arg1)

} else if m.VBuuid, _, err = m.FailoverLog.Latest(); err != nil {
panic(err)
return

} else {
fmsg := "%v ##%x StreamRequest: %v\n"
arg1 := logging.TagUD(m)
logging.Tracef(fmsg, kvdata.logPrefix, m.Opaque, arg1)

kvdata.reqTsMutex.RLock()
m.Seqno, _ = ts.SeqnoFor(vbno)
if err := worker.Event(m); err != nil {
panic(err)
kvdata.reqTsMutex.RUnlock()

if err = worker.Event(m); err != nil {
return
}
seqno = m.Seqno
}
Expand All @@ -629,16 +684,16 @@ func (kvdata *KVData) scatterMutation(
fmsg := "%v ##%x StreamEnd: %v\n"
arg1 := logging.TagUD(m)
logging.Tracef(fmsg, kvdata.logPrefix, m.Opaque, arg1)
if err := worker.Event(m); err != nil {
panic(err)
if err = worker.Event(m); err != nil {
return
}
}
kvdata.stats.endCount.Add(1)
kvdata.feed.PostStreamEnd(kvdata.keyspaceId, m, kvdata.uuid)

case mcd.DCP_SNAPSHOT:
if worker.Event(m) != nil {
panic(err)
if err = worker.Event(m); err != nil {
return
}
snapwindow := int64(m.SnapendSeq - m.SnapstartSeq + 1)
if snapwindow > 50000 {
Expand All @@ -649,8 +704,8 @@ func (kvdata *KVData) scatterMutation(

case mcd.DCP_MUTATION, mcd.DCP_DELETION, mcd.DCP_EXPIRATION:
seqno = m.Seqno
if err := worker.Event(m); err != nil {
panic(err)
if err = worker.Event(m); err != nil {
return
}
switch m.Opcode {
case mcd.DCP_MUTATION:
Expand All @@ -665,8 +720,8 @@ func (kvdata *KVData) scatterMutation(
fmsg := "%v ##%x SystemEvent: %v\n"
logging.Tracef(fmsg, kvdata.logPrefix, m.Opaque, m)
seqno = m.Seqno
if err := worker.Event(m); err != nil {
panic(err)
if err = worker.Event(m); err != nil {
return
}
switch m.EventType {
case mcd.COLLECTION_CREATE:
Expand All @@ -687,17 +742,17 @@ func (kvdata *KVData) scatterMutation(
fmsg := "%v ##%x SeqnoAdvanced event: %v\n"
logging.Tracef(fmsg, kvdata.logPrefix, m.Opaque, m)
seqno = m.Seqno
if err := worker.Event(m); err != nil {
panic(err)
if err = worker.Event(m); err != nil {
return
}
kvdata.stats.seqnoAdvanced.Add(1)

case mcd.DCP_OSO_SNAPSHOT: // Propagate OsoSnapshotEvent to workers

fmsg := "%v ##%x Received OSO Snapshot event: %v for vbucket: %v\n"
logging.Infof(fmsg, kvdata.logPrefix, m.Opaque, m.EventType, vbno)
if err := worker.Event(m); err != nil {
panic(err)
if err = worker.Event(m); err != nil {
return
}
switch m.EventType {
case mcd.OSO_SNAPSHOT_START:
Expand Down

0 comments on commit 69e0f6f

Please sign in to comment.