Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lifecycle management of a trainingJob #634

Merged
merged 4 commits into from
Mar 15, 2018

Conversation

qizheng09
Copy link
Collaborator

@qizheng09 qizheng09 commented Mar 5, 2018

@typhoonzero Hi~, I would like you to review this pr.
This pr contains the main process of lifecycle of a TrainingJob. Including:

  1. creating resources;
  2. maintaining a state machine;
  3. releasing resources;

@qizheng09
Copy link
Collaborator Author

@typhoonzero
This pr will pass the travis after #635 be merged.

@qizheng09 qizheng09 force-pushed the lifecycle_managerment branch 2 times, most recently from d8cf99e to fc110b2 Compare March 6, 2018 13:44
@typhoonzero
Copy link
Collaborator

@qizheng09 you can simply add "Reviewers" on the right side of this page, so that they can receive an email notification.

job: job,
kubeClient: kubeClient,
trainingJobClient: trainingJobClient,
status: job.Status,
eventCh: make(chan *trainingJobEvent, 1000),
}
return jobber, nil
return updater, nil
}

// NewUpdater return a trainingJobUpdater for controller.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NewUpdater return a trainingJobUpdater for controller. => NewUpdater creates a new TrainingJobUpdater and start a goroutine to control current job.

}

// NewUpdater return a trainingJobUpdater for controller.
func NewUpdater(job *padv1.TrainingJob, kubeClient kubernetes.Interface, trainingJobClient trainingJobClient.Interface) (*TrainingJobUpdater,
error) {
log.Infof("NewJobber namespace=%v name=%v", job.Namespace, job.Name)
jobber, err := initUpdater(job, kubeClient, trainingJobClient)
updater, err := initUpdater(job, kubeClient, trainingJobClient)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May not need another function call here, initUpdater is quite short.

return fmt.Errorf("unknow resource")
}
var replica int32
replica = 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var replica int32
replica = 0

is same to replica := 0


err = updater.kubeClient.CoreV1().Pods(updater.job.Namespace).DeleteCollection(&v1.DeleteOptions{}, options)
if err != nil {
return fmt.Errorf("release resource failed , namespace=%v name=%v resourceName=%v", updater.job.Namespace, updater.job.Name, resource.Name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can return this err directly.

for j := 0; j <= retryTimes; j++ {
time.Sleep(confirmResourceTicker)
pl, err := updater.kubeClient.CoreV1().Pods(updater.job.Namespace).List(options)
if err == nil && len(pl.Items) == 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to do something if the error still exists after retry.

Copy link
Collaborator Author

@qizheng09 qizheng09 Mar 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retry is used to make sure that the replica of resource have been scaled to 0. If error still exists or len(pl.Items) haven't changed to 0 after retry. I will delete pods forcely as shown below.

return nil
}

func (updater *TrainingJobUpdater) releaseMaPs() error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function name releaseMaPs does not make sense. Need a more clear name.

}

// SetUp validates the fields and parses the TrainingJob
func (updater *TrainingJobUpdater) setUp() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just call this parseTrainingJob

return err
}

func (updater *TrainingJobUpdater) releaseMasterPserver() error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about split releaseMasterPserver into two function releaseMaster and releasePserver ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay~

}

err := updater.kubeClient.CoreV1().Pods(updater.job.Namespace).DeleteCollection(&v1.DeleteOptions{}, options)
if err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks no need to check err != nil, just

return updater.kubeClient.CoreV1().Pods(updater.job.Namespace).DeleteCollection(&v1.DeleteOptions{}, options)

return err
}

err := updater.createTrainer()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return updater.createTrainer()

}

func (updater *TrainingJobUpdater) createTrainingJob() error {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete redundant blank line.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yancey1989 Done and thanks for your comments~

// it will notify updater to process the event. It send event to updater's eventCh.
func (updater *TrainingJobUpdater) notify(te *trainingJobEvent) {
select {
case updater.eventCh <- te:
Copy link
Collaborator

@helinwang helinwang Mar 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this code the same with:

// no more select
updater.eventCh <- te
// code in the case block.

Maybe the above is less verbose?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

select {
case updater.eventCh <- te:
lene, cape := len(updater.eventCh), cap(updater.eventCh)
if lene > int(float64(cape)*0.8) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe put 0.8 as a constant in this file.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

for v := range ticker.C {
rs, err := updater.kubeClient.ExtensionsV1beta1().ReplicaSets(updater.job.Namespace).Get(resource.Name, v1.GetOptions{})
log.Infof("Current time %v runing pod is %v, resourceName=%v", v.String(), rs.Status.ReadyReplicas, resource.Name)
if err != nil && !errors.IsServerTimeout(err) && !errors.IsTooManyRequests(err) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if err is timeout or too many requests? I see the code below uses the returned rs even when the err happened.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

func (updater *TrainingJobUpdater) parseTrainingJob() {
if updater.job == nil {
updater.status.Phase = padv1.TrainingJobPhaseFailed
updater.status.Reason = "Internal error; Setup error; job is missing TainingJob"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe need to return at here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

switch ev.pet {
case trainingJobEventDelete:
log.Infof("Delete updater, namespace=%v name=%v: ", updater.job.Namespace, updater.job.Name)
ticker.Stop()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already defer ticker.Stop() above, maybe it's not necessary?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

if updater.status.Phase == padv1.TrainingJobPhaseSucceeded || updater.status.Phase == padv1.TrainingJobPhaseFailed {
if ticker != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move this stop ticker logic to line 466, Convert function does not have much to do with ticker.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thank you very much for your comments, and I have fixed all of the comments you mentioned above.

resource.Spec.Replicas = &replica
_, err := updater.kubeClient.ExtensionsV1beta1().ReplicaSets(updater.job.Namespace).Update(resource)
if errors.IsNotFound(err) {
return nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May return err here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return err
}
}
if newResource != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our goal is to try our best to ensure the normal operation of the job. In case of unexpected errors, should we deal with other errors by retrying?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


func (updater *TrainingJobUpdater) createTrainer() error {
newTrainer, err := updater.kubeClient.BatchV1().Jobs(updater.job.Namespace).Get(updater.job.Spec.Trainer.ReplicaSpec.Name, v1.GetOptions{})
if errors.IsNotFound(err) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thank you very much for your comments, and I have fixed all of the comments you mentioned above.

Copy link
Collaborator

@typhoonzero typhoonzero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Need some naming polishment later on.

log.Infof("End to delete TrainingJob namespace=%v name=%v", updater.job.Namespace, updater.job.Name)

if fault {
return fmt.Errorf("delete resource error namespace=%v name=%v", updater.job.Namespace, updater.job.Name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may lose the original k8s api call error message, return the error object from releaseXXX call.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants