Skip to content

Commit

Permalink
error handle for creating resources
Browse files Browse the repository at this point in the history
  • Loading branch information
qizheng09 committed Mar 14, 2018
1 parent e46e836 commit ac86ef9
Showing 1 changed file with 34 additions and 27 deletions.
61 changes: 34 additions & 27 deletions go/updater/trainingJobUpdater.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import (
)

const (
retryTimes = 5
retry = 5
retryTime = 5 * time.Second
convertedTimerTicker = 10 * time.Second
confirmResourceTicker = 5 * time.Second
eventChLength = 1000
Expand Down Expand Up @@ -105,7 +106,7 @@ func (updater *TrainingJobUpdater) releaseResource(tp padv1.TrainingResourceType
resource.Spec.Replicas = &replica
_, err := updater.kubeClient.ExtensionsV1beta1().ReplicaSets(updater.job.Namespace).Update(resource)
if errors.IsNotFound(err) {
return nil
return err
}
key := "paddle-job-" + tp

Expand All @@ -118,15 +119,14 @@ func (updater *TrainingJobUpdater) releaseResource(tp padv1.TrainingResourceType
LabelSelector: selector,
}

for j := 0; j <= retryTimes; j++ {
for j := 0; j <= retry; j++ {
time.Sleep(confirmResourceTicker)
pl, err := updater.kubeClient.CoreV1().Pods(updater.job.Namespace).List(options)
if err == nil && len(pl.Items) == 0 {
return nil
}
}
err = updater.kubeClient.CoreV1().Pods(updater.job.Namespace).DeleteCollection(&v1.DeleteOptions{}, options)
return err
return updater.kubeClient.CoreV1().Pods(updater.job.Namespace).DeleteCollection(&v1.DeleteOptions{}, options)
}

func (updater *TrainingJobUpdater) releaseMaster() error {
Expand Down Expand Up @@ -212,17 +212,21 @@ func (updater *TrainingJobUpdater) createResource(tp padv1.TrainingResourceType)
default:
return fmt.Errorf("unknow resource")
}
newResource, err := updater.kubeClient.ExtensionsV1beta1().ReplicaSets(updater.job.Namespace).Get(resource.Name, v1.GetOptions{})
if errors.IsNotFound(err) {
log.Infof("not found to create namespace=%v name=%v resourceName=%v", updater.job.Namespace, updater.job.Name, resource.Name)
newResource, err = updater.kubeClient.ExtensionsV1beta1().ReplicaSets(updater.job.Namespace).Create(resource)
if err != nil {
updater.status.Phase = padv1.TrainingJobPhaseFailed
updater.status.Reason = "Internal error; create resource error:" + err.Error()
return err
for {
_, err := updater.kubeClient.ExtensionsV1beta1().ReplicaSets(updater.job.Namespace).Get(resource.Name, v1.GetOptions{})
if errors.IsNotFound(err) {
log.Infof("Not found to create namespace=%v name=%v resourceName=%v", updater.job.Namespace, updater.job.Name, resource.Name)
_, err = updater.kubeClient.ExtensionsV1beta1().ReplicaSets(updater.job.Namespace).Create(resource)
if err != nil {
updater.status.Phase = padv1.TrainingJobPhaseFailed
updater.status.Reason = "Internal error; create resource error:" + err.Error()
return err
}
} else if err != nil {
log.Errorf("Get resource error, namespace=%v name=%v resourceName=%v error=%v", updater.job.Namespace, updater.job.Name, resource.Name, err.Error())
time.Sleep(retryTime)
continue
}
}
if newResource != nil {
ticker := time.NewTicker(confirmResourceTicker)
defer ticker.Stop()
for v := range ticker.C {
Expand All @@ -246,26 +250,29 @@ func (updater *TrainingJobUpdater) createResource(tp padv1.TrainingResourceType)
}
}
}
return nil
}

func (updater *TrainingJobUpdater) createTrainer() error {
newTrainer, err := updater.kubeClient.BatchV1().Jobs(updater.job.Namespace).Get(updater.job.Spec.Trainer.ReplicaSpec.Name, v1.GetOptions{})
if errors.IsNotFound(err) {
log.Infof("not found to trainer pserver namespace=%v name=%v", updater.job.Namespace, updater.job.Name)
newTrainer, err = updater.kubeClient.BatchV1().Jobs(updater.job.Namespace).Create(updater.job.Spec.Trainer.ReplicaSpec)
if err != nil {
updater.status.Phase = padv1.TrainingJobPhaseFailed
updater.status.Reason = "Internal error; create trainer error:" + err.Error()
return err
resource := updater.job.Spec.Trainer.ReplicaSpec
for {
_, err := updater.kubeClient.BatchV1().Jobs(updater.job.Namespace).Get(resource.Name, v1.GetOptions{})
if errors.IsNotFound(err) {
log.Infof("not found to create trainer namespace=%v name=%v", updater.job.Namespace, updater.job.Name)
_, err = updater.kubeClient.BatchV1().Jobs(updater.job.Namespace).Create(resource)
if err != nil {
updater.status.Phase = padv1.TrainingJobPhaseFailed
updater.status.Reason = "Internal error; create trainer error:" + err.Error()
return err
}
} else if err != nil {
log.Errorf("Get resource error, namespace=%v name=%v resourceName=%v error=%v", updater.job.Namespace, updater.job.Name, resource.Name, err.Error())
time.Sleep(retryTime)
continue
}
}
if newTrainer != nil {
updater.status.Phase = padv1.TrainingJobPhaseRunning
updater.status.Reason = ""
return nil
}
return nil
}

func (updater *TrainingJobUpdater) createTrainingJob() error {
Expand Down

0 comments on commit ac86ef9

Please sign in to comment.