From ac86ef924795aba71544e178f94a00aa5227a834 Mon Sep 17 00:00:00 2001 From: zhengqi Date: Wed, 14 Mar 2018 17:56:35 +0800 Subject: [PATCH] error handle for creating resources --- go/updater/trainingJobUpdater.go | 61 ++++++++++++++++++-------------- 1 file changed, 34 insertions(+), 27 deletions(-) diff --git a/go/updater/trainingJobUpdater.go b/go/updater/trainingJobUpdater.go index a7302327..908e6000 100644 --- a/go/updater/trainingJobUpdater.go +++ b/go/updater/trainingJobUpdater.go @@ -14,7 +14,8 @@ import ( ) const ( - retryTimes = 5 + retry = 5 + retryTime = 5 * time.Second convertedTimerTicker = 10 * time.Second confirmResourceTicker = 5 * time.Second eventChLength = 1000 @@ -105,7 +106,7 @@ func (updater *TrainingJobUpdater) releaseResource(tp padv1.TrainingResourceType resource.Spec.Replicas = &replica _, err := updater.kubeClient.ExtensionsV1beta1().ReplicaSets(updater.job.Namespace).Update(resource) if errors.IsNotFound(err) { - return nil + return err } key := "paddle-job-" + tp @@ -118,15 +119,14 @@ func (updater *TrainingJobUpdater) releaseResource(tp padv1.TrainingResourceType LabelSelector: selector, } - for j := 0; j <= retryTimes; j++ { + for j := 0; j <= retry; j++ { time.Sleep(confirmResourceTicker) pl, err := updater.kubeClient.CoreV1().Pods(updater.job.Namespace).List(options) if err == nil && len(pl.Items) == 0 { return nil } } - err = updater.kubeClient.CoreV1().Pods(updater.job.Namespace).DeleteCollection(&v1.DeleteOptions{}, options) - return err + return updater.kubeClient.CoreV1().Pods(updater.job.Namespace).DeleteCollection(&v1.DeleteOptions{}, options) } func (updater *TrainingJobUpdater) releaseMaster() error { @@ -212,17 +212,21 @@ func (updater *TrainingJobUpdater) createResource(tp padv1.TrainingResourceType) default: return fmt.Errorf("unknow resource") } - newResource, err := updater.kubeClient.ExtensionsV1beta1().ReplicaSets(updater.job.Namespace).Get(resource.Name, v1.GetOptions{}) - if errors.IsNotFound(err) { - log.Infof("not found to create namespace=%v name=%v resourceName=%v", updater.job.Namespace, updater.job.Name, resource.Name) - newResource, err = updater.kubeClient.ExtensionsV1beta1().ReplicaSets(updater.job.Namespace).Create(resource) - if err != nil { - updater.status.Phase = padv1.TrainingJobPhaseFailed - updater.status.Reason = "Internal error; create resource error:" + err.Error() - return err + for { + _, err := updater.kubeClient.ExtensionsV1beta1().ReplicaSets(updater.job.Namespace).Get(resource.Name, v1.GetOptions{}) + if errors.IsNotFound(err) { + log.Infof("Not found to create namespace=%v name=%v resourceName=%v", updater.job.Namespace, updater.job.Name, resource.Name) + _, err = updater.kubeClient.ExtensionsV1beta1().ReplicaSets(updater.job.Namespace).Create(resource) + if err != nil { + updater.status.Phase = padv1.TrainingJobPhaseFailed + updater.status.Reason = "Internal error; create resource error:" + err.Error() + return err + } + } else if err != nil { + log.Errorf("Get resource error, namespace=%v name=%v resourceName=%v error=%v", updater.job.Namespace, updater.job.Name, resource.Name, err.Error()) + time.Sleep(retryTime) + continue } - } - if newResource != nil { ticker := time.NewTicker(confirmResourceTicker) defer ticker.Stop() for v := range ticker.C { @@ -246,26 +250,29 @@ func (updater *TrainingJobUpdater) createResource(tp padv1.TrainingResourceType) } } } - return nil } func (updater *TrainingJobUpdater) createTrainer() error { - newTrainer, err := updater.kubeClient.BatchV1().Jobs(updater.job.Namespace).Get(updater.job.Spec.Trainer.ReplicaSpec.Name, v1.GetOptions{}) - if errors.IsNotFound(err) { - log.Infof("not found to trainer pserver namespace=%v name=%v", updater.job.Namespace, updater.job.Name) - newTrainer, err = updater.kubeClient.BatchV1().Jobs(updater.job.Namespace).Create(updater.job.Spec.Trainer.ReplicaSpec) - if err != nil { - updater.status.Phase = padv1.TrainingJobPhaseFailed - updater.status.Reason = "Internal error; create trainer error:" + err.Error() - return err + resource := updater.job.Spec.Trainer.ReplicaSpec + for { + _, err := updater.kubeClient.BatchV1().Jobs(updater.job.Namespace).Get(resource.Name, v1.GetOptions{}) + if errors.IsNotFound(err) { + log.Infof("not found to create trainer namespace=%v name=%v", updater.job.Namespace, updater.job.Name) + _, err = updater.kubeClient.BatchV1().Jobs(updater.job.Namespace).Create(resource) + if err != nil { + updater.status.Phase = padv1.TrainingJobPhaseFailed + updater.status.Reason = "Internal error; create trainer error:" + err.Error() + return err + } + } else if err != nil { + log.Errorf("Get resource error, namespace=%v name=%v resourceName=%v error=%v", updater.job.Namespace, updater.job.Name, resource.Name, err.Error()) + time.Sleep(retryTime) + continue } - } - if newTrainer != nil { updater.status.Phase = padv1.TrainingJobPhaseRunning updater.status.Reason = "" return nil } - return nil } func (updater *TrainingJobUpdater) createTrainingJob() error {