Skip to content

Commit

Permalink
MB-54938: Add pause done callback skeleton
Browse files Browse the repository at this point in the history
- run cleanup at the end of token lifecycle
- move call to endTask into pause service manager

Change-Id: I7708fb335d7ec76d330603fb5750e162bce054da
  • Loading branch information
akhilmd committed Jan 18, 2023
1 parent fcf1f86 commit 9bc8464
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 6 deletions.
22 changes: 18 additions & 4 deletions secondary/indexer/pause_pauser.go
Expand Up @@ -145,6 +145,9 @@ func setPauseUploadTokenInMetakv(putId string, put *PauseUploadToken) {
// This is used only on the master node of a task_PAUSE task to do the GSI orchestration.
////////////////////////////////////////////////////////////////////////////////////////////////////

// Called at the end of the pause lifecycle. It takes pauseId and any error as input.
type PauseDoneCallback func(string, error)

// Pauser object holds the state of Pause orchestration
type Pauser struct {
// nodeDir is "node_<nodeId>/" for this node, where nodeId is the 32-digit hex ID from ns_server
Expand Down Expand Up @@ -183,15 +186,19 @@ type Pauser struct {
// For cleanup
retErr error
cleanupOnce sync.Once
doneCb PauseDoneCallback
}

// NewPauser creates a Pauser instance to execute the given task. It saves a pointer to itself in
// task.pauser (visible to pauseMgr parent) and launches a goroutine for the work.
//
// pauseMgr - parent object (singleton)
// task - the task_PAUSE task this object will execute
// master - true iff this node is the master
func NewPauser(pauseMgr *PauseServiceManager, task *taskObj, pauseToken *PauseToken) *Pauser {
// pauseToken - global PauseToken
// doneCb - callback that initiates the cleanup phase
func NewPauser(pauseMgr *PauseServiceManager, task *taskObj, pauseToken *PauseToken,
doneCb PauseDoneCallback) *Pauser {

pauser := &Pauser{
pauseMgr: pauseMgr,
task: task,
Expand All @@ -203,6 +210,8 @@ func NewPauser(pauseMgr *PauseServiceManager, task *taskObj, pauseToken *PauseTo

masterTokens: make(map[string]*PauseUploadToken),
followerTokens: make(map[string]*PauseUploadToken),

doneCb: doneCb,
}

task.taskMu.Lock()
Expand Down Expand Up @@ -359,6 +368,9 @@ func (p *Pauser) processUploadTokens(kve metakv.KVEntry) error {
logging.Infof("Pauser::processUploadTokens: PauseToken path[%v] value[%s]", kve.Path, kve.Value)

if kve.Value == nil {
// During cleanup, PauseToken is deleted by master and serves as a signal for
// all observers on followers to stop.

logging.Infof("Pauser::processUploadTokens: PauseToken Deleted. Mark Done.")
p.cancelMetakv()
p.finishPause(nil)
Expand Down Expand Up @@ -533,8 +545,10 @@ func (p *Pauser) doFinish() {
p.Cleanup()
p.wg.Wait()

p.pauseMgr.endTask(p.retErr,p.task.taskId)
// TODO: call done callback to start the cleanup phase
// TODO: move this to inside the done callback

// call done callback to start the cleanup phase
p.doneCb(p.pauseToken.PauseId, p.retErr)
}

func (p *Pauser) processPauseUploadTokenAsFollower(putId string, put *PauseUploadToken) bool {
Expand Down
51 changes: 49 additions & 2 deletions secondary/indexer/pause_service_manager.go
Expand Up @@ -649,14 +649,16 @@ func (m *PauseServiceManager) Pause(params service.PauseParams) (err error) {
}

if err := m.initStartPhase(params.Bucket, params.ID); err != nil {
m.runPauseCleanupPhase(params.ID, task.isMaster())
return err
}

// Create a Pauser object to run the master orchestration loop.

pauser := NewPauser(m, task, m.pauseTokensById[params.ID])
pauser := NewPauser(m, task, m.pauseTokensById[params.ID], m.pauseDoneCallback)

if err := m.setPauser(params.ID, pauser); err != nil {
m.runPauseCleanupPhase(params.ID, task.isMaster())
return err
}

Expand Down Expand Up @@ -715,6 +717,51 @@ func (m *PauseServiceManager) initStartPhase(bucketName, pauseId string) (err er
return nil
}

// pauseDoneCallback is the Pauser.cb.done callback function.
// Upload work is interrupted based on pauseId, using cancel ctx from task in pauser.
func (m *PauseServiceManager) pauseDoneCallback(pauseId string, err error) {

pauser, exists := m.getPauser(pauseId)
if !exists {
logging.Errorf("PauseServiceManager::pauseDoneCallback: Failed to find Pauser with pauseId[%v]", pauseId)
return
}

// If there is an error, set it in the task, otherwise, delete task from task list.
// TODO: Check if follower task should be handled differently.
m.endTask(err, pauseId)

isMaster := pauser.task.isMaster()

if err := m.runPauseCleanupPhase(pauseId, isMaster); err != nil {
logging.Errorf("PauseServiceManager::pauseDoneCallback: Failed to run cleanup: err[%v]", err)
return
}

if err := m.setPauser(pauseId, nil); err != nil {
logging.Errorf("PauseServiceManager::pauseDoneCallback: Failed to run cleanup: err[%v]", err)
return
}

logging.Infof("PauseServiceManager::pauseDoneCallback Pause Done: isMaster %v, err: %v",
isMaster, err)
}

func (m *PauseServiceManager) runPauseCleanupPhase(pauseId string, isMaster bool) error {

logging.Infof("PauseServiceManager::runPauseCleanupPhase pauseId[%v] isMaster[%v]", pauseId, isMaster)

if isMaster {
// TODO: cleanup global master token
}

// TODO: Get tokens and cleanup PauseUploadTokens

// TODO: Cleanup pause token in local meta

return nil
}

// PrepareResume is an external API called by ns_server (via cbauth) on all index nodes before
// calling Resume on leader only. The Resume call following PrepareResume will be given the SAME
// dryRun value. The full sequence is:
Expand Down Expand Up @@ -1020,7 +1067,7 @@ func (m *PauseServiceManager) RestHandlePause(w http.ResponseWriter, r *http.Req
return
}

pauser := NewPauser(m, task, &pauseToken)
pauser := NewPauser(m, task, &pauseToken, m.pauseDoneCallback)

if err := m.setPauser(pauseToken.PauseId, pauser); err != nil {
logging.Errorf("PauseServiceManager::RestHandlePause: Failed to set Pauser in bookkeeping" +
Expand Down

0 comments on commit 9bc8464

Please sign in to comment.