Navigation Menu

Skip to content

Commit

Permalink
MB-39662 add stream reset message
Browse files Browse the repository at this point in the history
This message can be used to reset the stream to 0 in case of
any exception in OSO mode (OSO mode doesn't support restart)

Change-Id: If491e0e884e09edba65566fe20199b2a4f9cf437
  • Loading branch information
deepkaran committed Sep 3, 2020
1 parent 233d6b1 commit 58c66b5
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 6 deletions.
66 changes: 66 additions & 0 deletions secondary/indexer/indexer.go
Expand Up @@ -103,6 +103,7 @@ type indexer struct {
streamKeyspaceIdRequestLock map[common.StreamId]map[string]chan *sync.Mutex
streamKeyspaceIdSessionId map[common.StreamId]map[string]uint64
streamKeyspaceIdCollectionId map[common.StreamId]map[string]string
streamKeyspaceIdOSOException map[common.StreamId]map[string]bool

streamKeyspaceIdPendBuildDone map[common.StreamId]map[string]*buildDoneSpec

Expand Down Expand Up @@ -260,6 +261,7 @@ func NewIndexer(config common.Config) (Indexer, Message) {
streamKeyspaceIdRequestLock: make(map[common.StreamId]map[string]chan *sync.Mutex),
streamKeyspaceIdSessionId: make(map[common.StreamId]map[string]uint64),
streamKeyspaceIdCollectionId: make(map[common.StreamId]map[string]string),
streamKeyspaceIdOSOException: make(map[common.StreamId]map[string]bool),
streamKeyspaceIdPendBuildDone: make(map[common.StreamId]map[string]*buildDoneSpec),
keyspaceIdBuildTs: make(map[string]Timestamp),
buildTsLock: make(map[common.StreamId]map[string]*sync.Mutex),
Expand Down Expand Up @@ -1214,6 +1216,9 @@ func (idx *indexer) handleWorkerMsgs(msg Message) {
case INDEXER_RESET_INDEX_DONE:
idx.handleResetIndexDone(msg)

case RESET_STREAM:
idx.handleResetStream(msg)

default:
logging.Fatalf("Indexer::handleWorkerMsgs Unknown Message %+v", msg)
common.CrashOnError(errors.New("Unknown Msg On Worker Channel"))
Expand Down Expand Up @@ -2925,6 +2930,65 @@ func (idx *indexer) handleInitPrepRecovery(msg Message) {
}
}

func (idx *indexer) handleResetStream(msg Message) {

keyspaceId := msg.(*MsgStreamUpdate).GetKeyspaceId()
streamId := msg.(*MsgStreamUpdate).GetStreamId()
sessionId := msg.(*MsgStreamUpdate).GetSessionId()

logging.Infof("Indexer::handleResetStream %v %v %v", streamId, keyspaceId, sessionId)

if ok, currSid := idx.validateSessionId(streamId, keyspaceId, sessionId, true); !ok {
logging.Infof("Indexer::handleResetStream StreamId %v KeyspaceId %v SessionId %v. "+
"Skipped. Current SessionId %v.", streamId, keyspaceId, sessionId, currSid)
return
}

//if the stream is inactive(e.g. all indexes get dropped)
if idx.getStreamKeyspaceIdState(streamId, keyspaceId) == STREAM_INACTIVE {
logging.Infof("Indexer::handleResetStream StreamId %v KeyspaceId %v "+
"State %v. Skipping Reset Stream and Cleaning up.",
streamId, keyspaceId, idx.getStreamKeyspaceIdState(streamId, keyspaceId))
idx.cleanupStreamKeyspaceIdState(streamId, keyspaceId)
} else {

//if OSO Exception has already been recorded, ignore the message
if idx.streamKeyspaceIdOSOException[streamId][keyspaceId] {
logging.Infof("Indexer::handleResetStream StreamId %v KeyspaceId %v "+
"OSOException Already Seen. Skipping Reset Stream.",
streamId, keyspaceId)
} else {

//check if a recovery is already in progress. This case should not happen.
//this is just for safety and debug information.
if idx.getStreamKeyspaceIdState(streamId, keyspaceId) != STREAM_ACTIVE {
logging.Warnf("Indexer::handleResetStream StreamId %v KeyspaceId %v "+
"ResetStream received during recovery.", streamId, keyspaceId)
}

idx.streamKeyspaceIdOSOException[streamId][keyspaceId] = true

idx.setStreamKeyspaceIdState(streamId, keyspaceId, STREAM_PREPARE_RECOVERY)
logging.Infof("Indexer::handleResetStream StreamId %v KeyspaceId %v State %v "+
"SessionId %v. Initiate Recovery.", streamId, keyspaceId, STREAM_PREPARE_RECOVERY, sessionId)

//create zero ts for rollback to 0
numVbuckets := idx.config["numVbuckets"].Int()
restartTs := common.NewTsVbuuid(GetBucketFromKeyspaceId(keyspaceId), numVbuckets)

//send recovery message to timekeeper
idx.tkCmdCh <- &MsgRecovery{mType: INDEXER_INIT_PREP_RECOVERY,
streamId: streamId,
keyspaceId: keyspaceId,
sessionId: sessionId,
restartTs: restartTs,
}
<-idx.tkCmdCh

}
}
}

func (idx *indexer) handlePrepareUnpause(msg Message) {

logging.Infof("Indexer::handlePrepareUnpause %v", idx.getIndexerState())
Expand Down Expand Up @@ -5763,6 +5827,7 @@ func (idx *indexer) initStreamCollectionIdMap() {

for i := 0; i < int(common.ALL_STREAMS); i++ {
idx.streamKeyspaceIdCollectionId[common.StreamId(i)] = make(map[string]string)
idx.streamKeyspaceIdOSOException[common.StreamId(i)] = make(map[string]bool)
}
}

Expand Down Expand Up @@ -8789,6 +8854,7 @@ func (idx *indexer) cleanupAllStreamKeyspaceIdState(
delete(idx.streamKeyspaceIdObserveFlushDone[streamId], keyspaceId)
delete(idx.streamKeyspaceIdPendBuildDone[streamId], keyspaceId)
delete(idx.streamKeyspaceIdCollectionId[streamId], keyspaceId)
delete(idx.streamKeyspaceIdOSOException[streamId], keyspaceId)
}

func (idx *indexer) prepareStreamKeyspaceIdForFreshStart(
Expand Down
4 changes: 4 additions & 0 deletions secondary/indexer/message.go
Expand Up @@ -161,6 +161,7 @@ const (
CLEANUP_STREAM
CLEANUP_PRJ_STATS
INDEXER_UPDATE_BUILD_TS
RESET_STREAM

CONFIG_SETTINGS_UPDATE

Expand Down Expand Up @@ -478,6 +479,7 @@ func (m *MsgUpdateKeyspaceIdQueue) String() string {
//CLEANUP_STREAM
//CLEANUP_PRJ_STATS
//INDEXER_UPDATE_BUILD_TS
//RESET_STREAM
type MsgStreamUpdate struct {
mType MsgType
streamId common.StreamId
Expand Down Expand Up @@ -2331,6 +2333,8 @@ func (m MsgType) String() string {
return "CLEANUP_PRJ_STATS"
case INDEXER_UPDATE_BUILD_TS:
return "INDEXER_UPDATE_BUILD_TS"
case RESET_STREAM:
return "RESET_STREAM"

case KV_SENDER_RESTART_VBUCKETS:
return "KV_SENDER_RESTART_VBUCKETS"
Expand Down
64 changes: 58 additions & 6 deletions secondary/indexer/timekeeper.go
Expand Up @@ -1159,7 +1159,7 @@ func (tk *timekeeper) handleStreamBegin(cmd Message) {
tk.ss.clearRepairState(streamId, meta.keyspaceId, meta.vbucket)

if tk.ss.getVbRefCount(streamId, meta.keyspaceId, meta.vbucket) > 1 {
logging.Infof("Timekeeper::handleStreamBegin \n\tOwner count > 1. Treat as CONN_ERR. "+
logging.Infof("Timekeeper::handleStreamBegin Owner count > 1. Treat as CONN_ERR. "+
"StreamId %v MutationMeta %v", streamId, meta)

// This will trigger repairStream, as well as replying to supervisor channel
Expand Down Expand Up @@ -1284,7 +1284,7 @@ func (tk *timekeeper) handleStreamEnd(cmd Message) {
// residue StreamEnd from old master can still arrive. Therefore, after
// CONN_ERROR, the total number of StreamEnd > total number of StreamBegin,
// and count will be negative in this case.
logging.Infof("Timekeeper::handleStreamEnd \n\tOwner count < 0. Treat as CONN_ERR. "+
logging.Infof("Timekeeper::handleStreamEnd Owner count < 0. Treat as CONN_ERR. "+
"StreamId %v MutationMeta %v", streamId, meta)

tk.handleStreamConnErrorInternal(streamId, meta.keyspaceId, []Vbucket{meta.vbucket})
Expand Down Expand Up @@ -1345,7 +1345,7 @@ func (tk *timekeeper) handleStreamConnError(cmd Message) {
}

if tk.vbCheckerStopCh == nil {
logging.Infof("Timekeeper::handleStreamConnError \n\t Call RepairMissingStreamBegin to check for vbucket for repair. "+
logging.Infof("Timekeeper::handleStreamConnError Call RepairMissingStreamBegin to check for vbucket for repair. "+
"StreamId %v KeyspaceId %v", streamId, keyspaceId)
tk.vbCheckerStopCh = make(chan bool)
go tk.repairMissingStreamBegin(streamId)
Expand Down Expand Up @@ -1504,7 +1504,7 @@ func (tk *timekeeper) handleStreamConnErrorInternal(streamId common.StreamId, ke

//check if keyspaceId is active in stream
if tk.checkKeyspaceActiveInStream(streamId, keyspaceId) == false {
logging.Warnf("Timekeeper::handleStreamConnError \n\tReceived ConnError for "+
logging.Warnf("Timekeeper::handleStreamConnError Received ConnError for "+
"Inactive KeyspaceId %v Stream %v. Ignored.", keyspaceId, streamId)
return
}
Expand Down Expand Up @@ -1854,13 +1854,39 @@ func (tk *timekeeper) handleStreamRequestDone(cmd Message) {
keyspaceId := cmd.(*MsgStreamInfo).GetKeyspaceId()
activeTs := cmd.(*MsgStreamInfo).GetActiveTs()
pendingTs := cmd.(*MsgStreamInfo).GetPendingTs()
sessionId := cmd.(*MsgStreamInfo).GetSessionId()

logging.Infof("Timekeeper::handleStreamRequestDone StreamId %v KeyspaceId %v",
streamId, keyspaceId)

tk.lock.Lock()
defer tk.lock.Unlock()

//check if keyspace is active in stream
if tk.checkKeyspaceActiveInStream(streamId, keyspaceId) == false {
logging.Warnf("Timekeeper::handleStreamRequestDone Received StreamRequestDone for "+
"Inactive KeyspaceId %v Stream %v. Ignored.", keyspaceId, streamId)
return
}

//if there are no indexes for this keyspace and stream, ignore
if c, ok := tk.ss.streamKeyspaceIdIndexCountMap[streamId][keyspaceId]; !ok || c <= 0 {
logging.Warnf("Timekeeper::handleStreamRequestDone Ignore StreamRequestDone for StreamId %v "+
"KeyspaceId %v. IndexCount %v. ", streamId, keyspaceId, c)
tk.supvCmdch <- &MsgSuccess{}
return
}

//if the session doesn't match, ignore
currSessionId := tk.ss.getSessionId(streamId, keyspaceId)
if sessionId != 0 && sessionId != currSessionId {
logging.Warnf("Timekeeper::handleStreamRequestDone Ignore StreamRequestDone for StreamId %v "+
"KeyspaceId %v. SessionId %v. Current Session %v ", streamId, keyspaceId,
sessionId, currSessionId)
tk.supvCmdch <- &MsgSuccess{}
return
}

tk.ss.streamKeyspaceIdKVActiveTsMap[streamId][keyspaceId] = activeTs
openTs := activeTs

Expand Down Expand Up @@ -1923,13 +1949,39 @@ func (tk *timekeeper) handleRecoveryDone(cmd Message) {
mergeTs := cmd.(*MsgRecovery).GetRestartTs()
activeTs := cmd.(*MsgRecovery).GetActiveTs()
pendingTs := cmd.(*MsgRecovery).GetPendingTs()
sessionId := cmd.(*MsgRecovery).GetSessionId()

logging.Infof("Timekeeper::handleRecoveryDone StreamId %v KeyspaceId %v",
streamId, keyspaceId)

tk.lock.Lock()
defer tk.lock.Unlock()

//check if keyspace is active in stream
if tk.checkKeyspaceActiveInStream(streamId, keyspaceId) == false {
logging.Warnf("Timekeeper::handleRecoveryDone Received RecoveryDone for "+
"Inactive KeyspaceId %v Stream %v. Ignored.", keyspaceId, streamId)
return
}

//if there are no indexes for this keyspace and stream, ignore
if c, ok := tk.ss.streamKeyspaceIdIndexCountMap[streamId][keyspaceId]; !ok || c <= 0 {
logging.Warnf("Timekeeper::handleRecoveryDone Ignore RecoveryDone for StreamId %v "+
"KeyspaceId %v. IndexCount %v. ", streamId, keyspaceId, c)
tk.supvCmdch <- &MsgSuccess{}
return
}

//if the session doesn't match, ignore
currSessionId := tk.ss.getSessionId(streamId, keyspaceId)
if sessionId != 0 && sessionId != currSessionId {
logging.Warnf("Timekeeper::handleRecoveryDone Ignore RecoveryDone for StreamId %v "+
"KeyspaceId %v. SessionId %v. Current Session %v ", streamId, keyspaceId,
sessionId, currSessionId)
tk.supvCmdch <- &MsgSuccess{}
return
}

tk.ss.streamKeyspaceIdKVActiveTsMap[streamId][keyspaceId] = activeTs
openTs := activeTs

Expand Down Expand Up @@ -2200,7 +2252,7 @@ func (tk *timekeeper) checkInitStreamReadyToMerge(streamId common.StreamId,
keyspaceId string, initFlushTs *common.TsVbuuid) bool {

logging.LazyTrace(func() string {
return fmt.Sprintf("Timekeeper::checkInitStreamReadyToMerge \n\t Stream %v KeyspaceId %v len(buildInfo) %v "+
return fmt.Sprintf("Timekeeper::checkInitStreamReadyToMerge Stream %v KeyspaceId %v len(buildInfo) %v "+
"FlushTs %v", streamId, keyspaceId, len(tk.indexBuildInfo), initFlushTs)
})

Expand Down Expand Up @@ -2294,7 +2346,7 @@ func (tk *timekeeper) checkInitStreamReadyToMerge(streamId common.StreamId,
mergeTs: initTsSeq,
sessionId: sessionId}

logging.Infof("Timekeeper::checkInitStreamReadyToMerge \n\t Stream %v "+
logging.Infof("Timekeeper::checkInitStreamReadyToMerge Stream %v "+
"KeyspaceId %v State Changed to INACTIVE", streamId, keyspaceId)
tk.stopTimer(streamId, keyspaceId)
tk.ss.cleanupKeyspaceIdFromStream(streamId, keyspaceId)
Expand Down

0 comments on commit 58c66b5

Please sign in to comment.