Skip to content

Commit

Permalink
MB-54921: Store PauseToken in local meta
Browse files Browse the repository at this point in the history
Also, add cleanup helper function.

Change-Id: Iac4000b772bd8ee6ef7ebfceab50481f20bac07c
  • Loading branch information
akhilmd committed Dec 29, 2022
1 parent 5354a37 commit 6345af8
Showing 1 changed file with 73 additions and 1 deletion.
74 changes: 73 additions & 1 deletion secondary/indexer/pause_service_manager.go
Expand Up @@ -460,7 +460,10 @@ func (m *PauseServiceManager) initStartPhase(bucketName, pauseId string) (err er

m.pauseTokensById[pauseId] = pauseToken

// TODO: Add to local metadata
// Add to local metadata
if err = m.registerLocalPauseToken(pauseToken); err != nil {
return err
}

// TODO: Add to metaKV

Expand Down Expand Up @@ -1012,6 +1015,8 @@ func postWithAuthWrapper(logPrefix string, url string, bodyBuf *bytes.Buffer, ta
// PauseToken
////////////////////////////////////////////////////////////////////////////////////////////////////

const PauseTokenTag = "PauseToken"

type PauseToken struct {
MasterId string
MasterIP string
Expand All @@ -1031,3 +1036,70 @@ func (m *PauseServiceManager) genPauseToken(masterIP, bucketName, pauseId string
PauseId: pauseId,
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////
// PauseToken - Lifecycle
////////////////////////////////////////////////////////////////////////////////////////////////////

func buildKeyForLocalPauseToken(pauseToken *PauseToken) string {
return fmt.Sprintf("%s_%s", PauseTokenTag, pauseToken.PauseId)
}

func (m *PauseServiceManager) registerLocalPauseToken(pauseToken *PauseToken) error {

pToken, err := json.Marshal(pauseToken)
if err != nil {
logging.Errorf("PauseServiceManager::registerLocalPauseToken: Failed to marshal pauseToken[%v]: err[%v]",
pauseToken, err)
return err
}

respch := make(MsgChannel)
m.supvMsgch <- &MsgClustMgrLocal{
mType: CLUST_MGR_SET_LOCAL,
key: buildKeyForLocalPauseToken(pauseToken),
value: string(pToken),
respch: respch,
}

respMsg := <-respch
resp := respMsg.(*MsgClustMgrLocal)

if err = resp.GetError(); err != nil {
logging.Errorf("PauseServiceManager::registerLocalPauseToken: Unable to set PauseToken In Local Meta" +
"Storage: [%v]", err)
return err
}

logging.Infof("PauseServiceManager::registerLocalPauseToken: Registered Pause Token In Local Meta: [%v]",
string(pToken))

return nil
}


func (m *PauseServiceManager) cleanupLocalPauseToken(pauseToken *PauseToken) error {

logging.Infof("PauseServiceManager::cleanupLocalPauseToken: Cleanup PauseToken[%v]", pauseToken)

key := buildKeyForLocalPauseToken(pauseToken)

respch := make(MsgChannel)
m.supvMsgch <- &MsgClustMgrLocal{
mType: CLUST_MGR_DEL_LOCAL,
key: key,
respch: respch,
}

respMsg := <-respch
resp := respMsg.(*MsgClustMgrLocal)

if err := resp.GetError(); err != nil {
logging.Fatalf("PauseServiceManager::cleanupLocalPauseToken: Unable to delete Pause Token In Local"+
"Meta Storage. Path[%v] Err[%v]", key, err)
common.CrashOnError(err)
}

delete(m.pauseTokensById, pauseToken.PauseId)
return nil
}

0 comments on commit 6345af8

Please sign in to comment.