From 6345af8f5a8472568a16f734874d7e57af1962d9 Mon Sep 17 00:00:00 2001 From: akhilmd Date: Fri, 23 Dec 2022 12:28:25 +0530 Subject: [PATCH] MB-54921: Store PauseToken in local meta Also, add cleanup helper function. Change-Id: Iac4000b772bd8ee6ef7ebfceab50481f20bac07c --- secondary/indexer/pause_service_manager.go | 74 +++++++++++++++++++++- 1 file changed, 73 insertions(+), 1 deletion(-) diff --git a/secondary/indexer/pause_service_manager.go b/secondary/indexer/pause_service_manager.go index 9cc47b90b..36424e6b2 100644 --- a/secondary/indexer/pause_service_manager.go +++ b/secondary/indexer/pause_service_manager.go @@ -460,7 +460,10 @@ func (m *PauseServiceManager) initStartPhase(bucketName, pauseId string) (err er m.pauseTokensById[pauseId] = pauseToken - // TODO: Add to local metadata + // Add to local metadata + if err = m.registerLocalPauseToken(pauseToken); err != nil { + return err + } // TODO: Add to metaKV @@ -1012,6 +1015,8 @@ func postWithAuthWrapper(logPrefix string, url string, bodyBuf *bytes.Buffer, ta // PauseToken //////////////////////////////////////////////////////////////////////////////////////////////////// +const PauseTokenTag = "PauseToken" + type PauseToken struct { MasterId string MasterIP string @@ -1031,3 +1036,70 @@ func (m *PauseServiceManager) genPauseToken(masterIP, bucketName, pauseId string PauseId: pauseId, } } + +//////////////////////////////////////////////////////////////////////////////////////////////////// +// PauseToken - Lifecycle +//////////////////////////////////////////////////////////////////////////////////////////////////// + +func buildKeyForLocalPauseToken(pauseToken *PauseToken) string { + return fmt.Sprintf("%s_%s", PauseTokenTag, pauseToken.PauseId) +} + +func (m *PauseServiceManager) registerLocalPauseToken(pauseToken *PauseToken) error { + + pToken, err := json.Marshal(pauseToken) + if err != nil { + logging.Errorf("PauseServiceManager::registerLocalPauseToken: Failed to marshal pauseToken[%v]: err[%v]", + pauseToken, err) + return err + } + + respch := make(MsgChannel) + m.supvMsgch <- &MsgClustMgrLocal{ + mType: CLUST_MGR_SET_LOCAL, + key: buildKeyForLocalPauseToken(pauseToken), + value: string(pToken), + respch: respch, + } + + respMsg := <-respch + resp := respMsg.(*MsgClustMgrLocal) + + if err = resp.GetError(); err != nil { + logging.Errorf("PauseServiceManager::registerLocalPauseToken: Unable to set PauseToken In Local Meta" + + "Storage: [%v]", err) + return err + } + + logging.Infof("PauseServiceManager::registerLocalPauseToken: Registered Pause Token In Local Meta: [%v]", + string(pToken)) + + return nil +} + + +func (m *PauseServiceManager) cleanupLocalPauseToken(pauseToken *PauseToken) error { + + logging.Infof("PauseServiceManager::cleanupLocalPauseToken: Cleanup PauseToken[%v]", pauseToken) + + key := buildKeyForLocalPauseToken(pauseToken) + + respch := make(MsgChannel) + m.supvMsgch <- &MsgClustMgrLocal{ + mType: CLUST_MGR_DEL_LOCAL, + key: key, + respch: respch, + } + + respMsg := <-respch + resp := respMsg.(*MsgClustMgrLocal) + + if err := resp.GetError(); err != nil { + logging.Fatalf("PauseServiceManager::cleanupLocalPauseToken: Unable to delete Pause Token In Local"+ + "Meta Storage. Path[%v] Err[%v]", key, err) + common.CrashOnError(err) + } + + delete(m.pauseTokensById, pauseToken.PauseId) + return nil +}