From 9bc846460b7e3c7bc70915e0afe4a572eabe7491 Mon Sep 17 00:00:00 2001 From: akhilmd Date: Mon, 16 Jan 2023 21:39:19 +0530 Subject: [PATCH] MB-54938: Add pause done callback skeleton - run cleanup at the end of token lifecycle - move call to endTask into pause service manager Change-Id: I7708fb335d7ec76d330603fb5750e162bce054da --- secondary/indexer/pause_pauser.go | 22 ++++++++-- secondary/indexer/pause_service_manager.go | 51 +++++++++++++++++++++- 2 files changed, 67 insertions(+), 6 deletions(-) diff --git a/secondary/indexer/pause_pauser.go b/secondary/indexer/pause_pauser.go index 960923e14..33563cc05 100644 --- a/secondary/indexer/pause_pauser.go +++ b/secondary/indexer/pause_pauser.go @@ -145,6 +145,9 @@ func setPauseUploadTokenInMetakv(putId string, put *PauseUploadToken) { // This is used only on the master node of a task_PAUSE task to do the GSI orchestration. //////////////////////////////////////////////////////////////////////////////////////////////////// +// Called at the end of the pause lifecycle. It takes pauseId and any error as input. +type PauseDoneCallback func(string, error) + // Pauser object holds the state of Pause orchestration type Pauser struct { // nodeDir is "node_/" for this node, where nodeId is the 32-digit hex ID from ns_server @@ -183,6 +186,7 @@ type Pauser struct { // For cleanup retErr error cleanupOnce sync.Once + doneCb PauseDoneCallback } // NewPauser creates a Pauser instance to execute the given task. It saves a pointer to itself in @@ -190,8 +194,11 @@ type Pauser struct { // // pauseMgr - parent object (singleton) // task - the task_PAUSE task this object will execute -// master - true iff this node is the master -func NewPauser(pauseMgr *PauseServiceManager, task *taskObj, pauseToken *PauseToken) *Pauser { +// pauseToken - global PauseToken +// doneCb - callback that initiates the cleanup phase +func NewPauser(pauseMgr *PauseServiceManager, task *taskObj, pauseToken *PauseToken, + doneCb PauseDoneCallback) *Pauser { + pauser := &Pauser{ pauseMgr: pauseMgr, task: task, @@ -203,6 +210,8 @@ func NewPauser(pauseMgr *PauseServiceManager, task *taskObj, pauseToken *PauseTo masterTokens: make(map[string]*PauseUploadToken), followerTokens: make(map[string]*PauseUploadToken), + + doneCb: doneCb, } task.taskMu.Lock() @@ -359,6 +368,9 @@ func (p *Pauser) processUploadTokens(kve metakv.KVEntry) error { logging.Infof("Pauser::processUploadTokens: PauseToken path[%v] value[%s]", kve.Path, kve.Value) if kve.Value == nil { + // During cleanup, PauseToken is deleted by master and serves as a signal for + // all observers on followers to stop. + logging.Infof("Pauser::processUploadTokens: PauseToken Deleted. Mark Done.") p.cancelMetakv() p.finishPause(nil) @@ -533,8 +545,10 @@ func (p *Pauser) doFinish() { p.Cleanup() p.wg.Wait() - p.pauseMgr.endTask(p.retErr,p.task.taskId) - // TODO: call done callback to start the cleanup phase + // TODO: move this to inside the done callback + + // call done callback to start the cleanup phase + p.doneCb(p.pauseToken.PauseId, p.retErr) } func (p *Pauser) processPauseUploadTokenAsFollower(putId string, put *PauseUploadToken) bool { diff --git a/secondary/indexer/pause_service_manager.go b/secondary/indexer/pause_service_manager.go index 00d49e11b..07dbf4ed9 100644 --- a/secondary/indexer/pause_service_manager.go +++ b/secondary/indexer/pause_service_manager.go @@ -649,14 +649,16 @@ func (m *PauseServiceManager) Pause(params service.PauseParams) (err error) { } if err := m.initStartPhase(params.Bucket, params.ID); err != nil { + m.runPauseCleanupPhase(params.ID, task.isMaster()) return err } // Create a Pauser object to run the master orchestration loop. - pauser := NewPauser(m, task, m.pauseTokensById[params.ID]) + pauser := NewPauser(m, task, m.pauseTokensById[params.ID], m.pauseDoneCallback) if err := m.setPauser(params.ID, pauser); err != nil { + m.runPauseCleanupPhase(params.ID, task.isMaster()) return err } @@ -715,6 +717,51 @@ func (m *PauseServiceManager) initStartPhase(bucketName, pauseId string) (err er return nil } +// pauseDoneCallback is the Pauser.cb.done callback function. +// Upload work is interrupted based on pauseId, using cancel ctx from task in pauser. +func (m *PauseServiceManager) pauseDoneCallback(pauseId string, err error) { + + pauser, exists := m.getPauser(pauseId) + if !exists { + logging.Errorf("PauseServiceManager::pauseDoneCallback: Failed to find Pauser with pauseId[%v]", pauseId) + return + } + + // If there is an error, set it in the task, otherwise, delete task from task list. + // TODO: Check if follower task should be handled differently. + m.endTask(err, pauseId) + + isMaster := pauser.task.isMaster() + + if err := m.runPauseCleanupPhase(pauseId, isMaster); err != nil { + logging.Errorf("PauseServiceManager::pauseDoneCallback: Failed to run cleanup: err[%v]", err) + return + } + + if err := m.setPauser(pauseId, nil); err != nil { + logging.Errorf("PauseServiceManager::pauseDoneCallback: Failed to run cleanup: err[%v]", err) + return + } + + logging.Infof("PauseServiceManager::pauseDoneCallback Pause Done: isMaster %v, err: %v", + isMaster, err) +} + +func (m *PauseServiceManager) runPauseCleanupPhase(pauseId string, isMaster bool) error { + + logging.Infof("PauseServiceManager::runPauseCleanupPhase pauseId[%v] isMaster[%v]", pauseId, isMaster) + + if isMaster { + // TODO: cleanup global master token + } + + // TODO: Get tokens and cleanup PauseUploadTokens + + // TODO: Cleanup pause token in local meta + + return nil +} + // PrepareResume is an external API called by ns_server (via cbauth) on all index nodes before // calling Resume on leader only. The Resume call following PrepareResume will be given the SAME // dryRun value. The full sequence is: @@ -1020,7 +1067,7 @@ func (m *PauseServiceManager) RestHandlePause(w http.ResponseWriter, r *http.Req return } - pauser := NewPauser(m, task, &pauseToken) + pauser := NewPauser(m, task, &pauseToken, m.pauseDoneCallback) if err := m.setPauser(pauseToken.PauseId, pauser); err != nil { logging.Errorf("PauseServiceManager::RestHandlePause: Failed to set Pauser in bookkeeping" +