From 54485206e5106f3820976556520fa0029d4dae4b Mon Sep 17 00:00:00 2001 From: m3ngyang Date: Sat, 17 Mar 2018 16:20:40 +0800 Subject: [PATCH 1/6] jobparser for crd TrainingJob --- .gitignore | 1 + pkg/apis/paddlepaddle/v1/trainingjob.go | 29 ++ pkg/apis/paddlepaddle/v1/types.go | 12 +- pkg/updater/jobparser.go | 337 ++++++++++++++++++++++++ 4 files changed, 373 insertions(+), 6 deletions(-) create mode 100644 pkg/apis/paddlepaddle/v1/trainingjob.go create mode 100644 pkg/updater/jobparser.go diff --git a/.gitignore b/.gitignore index 3c93ba89..2dd54b01 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ *~ vendor/ .glide/ +.vscode/ diff --git a/pkg/apis/paddlepaddle/v1/trainingjob.go b/pkg/apis/paddlepaddle/v1/trainingjob.go new file mode 100644 index 00000000..eb2e2c1b --- /dev/null +++ b/pkg/apis/paddlepaddle/v1/trainingjob.go @@ -0,0 +1,29 @@ +package v1 + +import "encoding/json" + +// Elastic returns true if the job can scale to more workers. +func (s *TrainingJob) Elastic() bool { + return s.Spec.Trainer.MinInstance < s.Spec.Trainer.MaxInstance +} + +// GPU convert Resource Limit Quantity to int +func (s *TrainingJob) GPU() int { + q := s.Spec.Trainer.Resources.Limits.NvidiaGPU() + gpu, ok := q.AsInt64() + if !ok { + // FIXME: treat errors + gpu = 0 + } + return int(gpu) +} + +// NeedGPU returns true if the job need GPU resource to run. +func (s *TrainingJob) NeedGPU() bool { + return s.GPU() > 0 +} + +func (s *TrainingJob) String() string { + b, _ := json.MarshalIndent(s, "", " ") + return string(b[:]) +} diff --git a/pkg/apis/paddlepaddle/v1/types.go b/pkg/apis/paddlepaddle/v1/types.go index fe47074f..3b91280d 100644 --- a/pkg/apis/paddlepaddle/v1/types.go +++ b/pkg/apis/paddlepaddle/v1/types.go @@ -111,12 +111,12 @@ type TrainerJobScaleStatus struct { type TrainingResourceType string const ( - // MASTER is the master name of TrainingResourceType. - MASTER TrainingResourceType = "MASTER" - // PSERVER is the pserver name of TrainingResourceType. - PSERVER TrainingResourceType = "PSERVER" - // TRAINER is the trainer name of TrainingResourceType. - TRAINER TrainingResourceType = "TRAINER" + // Master is the master name of TrainingResourceType. + Master TrainingResourceType = "MASTER" + // Pserver is the pserver name of TrainingResourceType. + Pserver TrainingResourceType = "PSERVER" + // Trainer is the trainer name of TrainingResourceType. + Trainer TrainingResourceType = "TRAINER" ) // ResourceState is the state of a type of resource diff --git a/pkg/updater/jobparser.go b/pkg/updater/jobparser.go new file mode 100644 index 00000000..a37b7443 --- /dev/null +++ b/pkg/updater/jobparser.go @@ -0,0 +1,337 @@ +/* Copyright (c) 2016 PaddlePaddle Authors All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ + +package updater + +import ( + "errors" + "fmt" + "strconv" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + v1beta1 "k8s.io/api/extensions/v1beta1" + apiresource "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + paddlev1 "github.com/paddlepaddle/edl/pkg/apis/paddlepaddle/v1" +) + +const ( + imagePullPolicy = "Always" +) + +// JobParser is a interface can parse given simple TrainingJob struct to +// an integrated TrainingJob +type JobParser interface { + Validate(job *paddlev1.TrainingJob) error + ParseToTrainingJob(job *paddlev1.TrainingJob) *paddlev1.TrainingJob + parseToTrainer(job *paddlev1.TrainingJob) *batchv1.Job + parseToPserver(job *paddlev1.TrainingJob) *v1beta1.ReplicaSet + parseToMaster(job *paddlev1.TrainingJob) *v1beta1.ReplicaSet +} + +// DefaultJobParser implement a basic JobParser. +type DefaultJobParser int + +// Validate updates default values for the added job and validates the fields. +func (p *DefaultJobParser) Validate(job *paddlev1.TrainingJob) error { + // Fill in default values + // FIXME: Need to test. What is the value if specified "omitempty" + if job.Spec.Port == 0 { + job.Spec.Port = 7164 + } + if job.Spec.PortsNum == 0 { + job.Spec.PortsNum = 1 + } + if job.Spec.PortsNumForSparse == 0 { + job.Spec.PortsNumForSparse = 1 + } + if job.Spec.Image == "" { + job.Spec.Image = "paddlepaddle/paddlecloud-job" + } + if job.Spec.Passes == 0 { + job.Spec.Passes = 1 + } + + if !job.Spec.FaultTolerant && job.Elastic() { + return errors.New("max-instances should equal to min-instances when fault_tolerant is disabled") + } + // TODO: add validations. + return nil +} + +// ParseToTrainingJob generates a whole structure of TrainingJob +func (p *DefaultJobParser) ParseToTrainingJob(job *paddlev1.TrainingJob) *paddlev1.TrainingJob { + useHostNetwork := job.Spec.HostNetwork + if job.Spec.FaultTolerant { + job.Spec.Master.ReplicaSpec = p.parseToMaster(job) + if useHostNetwork { + job.Spec.Master.ReplicaSpec.Spec.Template.Spec.HostNetwork = true + } + } + job.Spec.Pserver.ReplicaSpec = p.parseToPserver(job) + job.Spec.Trainer.ReplicaSpec = p.parseToTrainer(job) + if useHostNetwork { + job.Spec.Pserver.ReplicaSpec.Spec.Template.Spec.HostNetwork = true + job.Spec.Trainer.ReplicaSpec.Spec.Template.Spec.HostNetwork = true + } + return job +} + +// parseToPserver generate a pserver replicaset resource according to "TrainingJob" resource specs. +func (p *DefaultJobParser) parseToPserver(job *paddlev1.TrainingJob) *v1beta1.ReplicaSet { + replicas := int32(job.Spec.Pserver.MinInstance) + command := make([]string, 2, 2) + // FIXME: refine these part. + if job.Spec.FaultTolerant { + command = []string{"paddle_k8s", "start_pserver"} + } else { + command = []string{"paddle_k8s", "start_new_pserver"} + } + + return &v1beta1.ReplicaSet{ + TypeMeta: metav1.TypeMeta{ + Kind: "extensions/v1beta1", + APIVersion: "ReplicaSet", + }, + ObjectMeta: job.ObjectMeta, + Spec: v1beta1.ReplicaSetSpec{ + Replicas: &replicas, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"paddle-job-pserver": job.ObjectMeta.Name}, + }, + Spec: corev1.PodSpec{ + Volumes: job.Spec.Volumes, + Containers: []corev1.Container{ + corev1.Container{ + Name: "pserver", + Image: job.Spec.Image, + Ports: podPorts(job), + Env: podEnv(job), + Command: command, + Resources: job.Spec.Pserver.Resources, + }, + }, + }, + }, + }, + } +} + +// parseToTrainer parse TrainingJob to a kubernetes job resource. +func (p *DefaultJobParser) parseToTrainer(job *paddlev1.TrainingJob) *batchv1.Job { + replicas := int32(job.Spec.Trainer.MinInstance) + command := make([]string, 2) + if job.Spec.FaultTolerant { + command = []string{"paddle_k8s", "start_trainer"} + } else { + command = []string{"paddle_k8s", "start_new_trainer"} + } + + return &batchv1.Job{ + TypeMeta: metav1.TypeMeta{ + Kind: "Job", + APIVersion: "batch/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: job.ObjectMeta.Name + "-trainer", + Namespace: job.ObjectMeta.Namespace, + }, + Spec: batchv1.JobSpec{ + Parallelism: &replicas, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"paddle-job": job.ObjectMeta.Name}, + }, + Spec: corev1.PodSpec{ + Volumes: job.Spec.Volumes, + Containers: []corev1.Container{ + corev1.Container{ + Name: "trainer", + Image: job.Spec.Image, + ImagePullPolicy: imagePullPolicy, + Command: command, + VolumeMounts: job.Spec.VolumeMounts, + Ports: podPorts(job), + Env: podEnv(job), + Resources: job.Spec.Trainer.Resources, + }, + }, + RestartPolicy: "Never", + }, + }, + }, + } +} + +func masterResource(job *paddlev1.TrainingJob) *corev1.ResourceRequirements { + // TODO(gongwb): config master resource? + return &corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + "cpu": *apiresource.NewQuantity(int64(2), apiresource.DecimalSI), + "memory": apiresource.MustParse("1Gi"), + }, + Requests: corev1.ResourceList{ + "cpu": *apiresource.NewQuantity(int64(1), apiresource.DecimalSI), + "memory": apiresource.MustParse("500Mi"), + }, + } +} + +func getEtcdPodSpec(job *paddlev1.TrainingJob) *corev1.Container { + command := []string{"etcd", "-name", "etcd0", + "-advertise-client-urls", "http://$(POD_IP):2379,http://$(POD_IP):4001", + "-listen-client-urls", "http://0.0.0.0:2379,http://0.0.0.0:4001", + "-initial-advertise-peer-urls", "http://$(POD_IP):2380", + "-listen-peer-urls", "http://0.0.0.0:2380", + "-initial-cluster", "etcd0=http://$(POD_IP):2380", + "-initial-cluster-state", "new"} + + return &corev1.Container{ + Name: "etcd", + Image: "quay.io/coreos/etcd:v3.2.1", + ImagePullPolicy: imagePullPolicy, + // TODO(gongwb): etcd ports? + Env: podEnv(job), + Command: command, + } +} + +// parseToMaster parse TrainingJob to a kubernetes replicaset resource. +func (p *DefaultJobParser) parseToMaster(job *paddlev1.TrainingJob) *v1beta1.ReplicaSet { + replicas := int32(1) + // FIXME: refine these part. + command := []string{"paddle_k8s", "start_master"} + + return &v1beta1.ReplicaSet{ + TypeMeta: metav1.TypeMeta{ + Kind: "extensions/v1beta1", + APIVersion: "ReplicaSet", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: job.ObjectMeta.Name + "-master", + Namespace: job.ObjectMeta.Namespace, + }, + Spec: v1beta1.ReplicaSetSpec{ + Replicas: &replicas, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"paddle-job-master": job.ObjectMeta.Name}, + }, + Spec: corev1.PodSpec{ + Volumes: job.Spec.Volumes, + Containers: []corev1.Container{ + corev1.Container{ + Name: "master", + Image: job.Spec.Image, + ImagePullPolicy: imagePullPolicy, + Ports: masterPorts(job), + Command: command, + VolumeMounts: job.Spec.VolumeMounts, + Resources: *masterResource(job), + }, + *getEtcdPodSpec(job), + }, + }, + }, + }, + } +} + +// ----------------------------------------------------------------------- +// general functions that pserver, trainer use the same +// ----------------------------------------------------------------------- +func podPorts(job *paddlev1.TrainingJob) []corev1.ContainerPort { + portsTotal := job.Spec.PortsNum + job.Spec.PortsNumForSparse + ports := make([]corev1.ContainerPort, 0) + basePort := int32(job.Spec.Port) + for i := 0; i < portsTotal; i++ { + ports = append(ports, corev1.ContainerPort{ + Name: fmt.Sprintf("jobport-%d", basePort), + ContainerPort: basePort, + }) + basePort++ + } + return ports +} + +func masterPorts(job *paddlev1.TrainingJob) []corev1.ContainerPort { + ports := []corev1.ContainerPort{ + corev1.ContainerPort{ + Name: "master-port", + ContainerPort: 8080, + }, + corev1.ContainerPort{ + Name: "etcd-port", + ContainerPort: 2379, + }, + } + return ports +} + +func podEnv(job *paddlev1.TrainingJob) []corev1.EnvVar { + needGPU := "0" + if job.NeedGPU() { + needGPU = "1" + } + trainerCount := 1 + if job.NeedGPU() { + q := job.Spec.Trainer.Resources.Requests.NvidiaGPU() + trainerCount = int(q.Value()) + } else { + q := job.Spec.Trainer.Resources.Requests.Cpu() + // FIXME: CPU resource value can be less than 1. + trainerCount = int(q.Value()) + } + + return []corev1.EnvVar{ + corev1.EnvVar{Name: "PADDLE_JOB_NAME", Value: job.ObjectMeta.Name}, + // NOTICE: TRAINERS, PSERVERS, PADDLE_INIT_NUM_GRADIENT_SERVERS + // these env are used for non-faulttolerant training, + // use min-instance all the time. When job is elastic, + // these envs are not used. + corev1.EnvVar{Name: "TRAINERS", Value: strconv.Itoa(job.Spec.Trainer.MinInstance)}, + corev1.EnvVar{Name: "PSERVERS", Value: strconv.Itoa(job.Spec.Pserver.MinInstance)}, + corev1.EnvVar{Name: "ENTRY", Value: job.Spec.Trainer.Entrypoint}, + // FIXME: TOPOLOGY deprecated + corev1.EnvVar{Name: "TOPOLOGY", Value: job.Spec.Trainer.Entrypoint}, + corev1.EnvVar{Name: "TRAINER_PACKAGE", Value: job.Spec.Trainer.Workspace}, + corev1.EnvVar{Name: "PADDLE_INIT_PORT", Value: strconv.Itoa(job.Spec.Port)}, + // PADDLE_INIT_TRAINER_COUNT should be same to gpu number when use gpu + // and cpu cores when using cpu + corev1.EnvVar{Name: "PADDLE_INIT_TRAINER_COUNT", Value: strconv.Itoa(trainerCount)}, + corev1.EnvVar{Name: "PADDLE_INIT_PORTS_NUM", Value: strconv.Itoa(job.Spec.PortsNum)}, + corev1.EnvVar{Name: "PADDLE_INIT_PORTS_NUM_FOR_SPARSE", Value: strconv.Itoa(job.Spec.PortsNumForSparse)}, + corev1.EnvVar{Name: "PADDLE_INIT_NUM_GRADIENT_SERVERS", Value: strconv.Itoa(job.Spec.Trainer.MinInstance)}, + corev1.EnvVar{Name: "PADDLE_INIT_NUM_PASSES", Value: strconv.Itoa(job.Spec.Passes)}, + corev1.EnvVar{Name: "PADDLE_INIT_USE_GPU", Value: needGPU}, + corev1.EnvVar{Name: "LD_LIBRARY_PATH", Value: "/usr/local/cuda/lib64"}, + corev1.EnvVar{Name: "NAMESPACE", ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.namespace", + }, + }}, + corev1.EnvVar{Name: "POD_IP", ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "status.podIP", + }, + }}, + } +} + +// ----------------------------------------------------------------------- +// general functions end +// ----------------------------------------------------------------------- From eec86bb37c9ef4954011e3f48a4569a3b2c732b7 Mon Sep 17 00:00:00 2001 From: m3ngyang Date: Tue, 20 Mar 2018 22:00:18 +0800 Subject: [PATCH 2/6] fix conflicts --- pkg/updater/jobparser.go | 15 ++++++--------- pkg/updater/trainingJobUpdater.go | 32 ++++++++++++++++++------------- 2 files changed, 25 insertions(+), 22 deletions(-) diff --git a/pkg/updater/jobparser.go b/pkg/updater/jobparser.go index a37b7443..e6c831ce 100644 --- a/pkg/updater/jobparser.go +++ b/pkg/updater/jobparser.go @@ -37,9 +37,6 @@ const ( type JobParser interface { Validate(job *paddlev1.TrainingJob) error ParseToTrainingJob(job *paddlev1.TrainingJob) *paddlev1.TrainingJob - parseToTrainer(job *paddlev1.TrainingJob) *batchv1.Job - parseToPserver(job *paddlev1.TrainingJob) *v1beta1.ReplicaSet - parseToMaster(job *paddlev1.TrainingJob) *v1beta1.ReplicaSet } // DefaultJobParser implement a basic JobParser. @@ -76,13 +73,13 @@ func (p *DefaultJobParser) Validate(job *paddlev1.TrainingJob) error { func (p *DefaultJobParser) ParseToTrainingJob(job *paddlev1.TrainingJob) *paddlev1.TrainingJob { useHostNetwork := job.Spec.HostNetwork if job.Spec.FaultTolerant { - job.Spec.Master.ReplicaSpec = p.parseToMaster(job) + job.Spec.Master.ReplicaSpec = parseToMaster(job) if useHostNetwork { job.Spec.Master.ReplicaSpec.Spec.Template.Spec.HostNetwork = true } } - job.Spec.Pserver.ReplicaSpec = p.parseToPserver(job) - job.Spec.Trainer.ReplicaSpec = p.parseToTrainer(job) + job.Spec.Pserver.ReplicaSpec = parseToPserver(job) + job.Spec.Trainer.ReplicaSpec = parseToTrainer(job) if useHostNetwork { job.Spec.Pserver.ReplicaSpec.Spec.Template.Spec.HostNetwork = true job.Spec.Trainer.ReplicaSpec.Spec.Template.Spec.HostNetwork = true @@ -91,7 +88,7 @@ func (p *DefaultJobParser) ParseToTrainingJob(job *paddlev1.TrainingJob) *paddle } // parseToPserver generate a pserver replicaset resource according to "TrainingJob" resource specs. -func (p *DefaultJobParser) parseToPserver(job *paddlev1.TrainingJob) *v1beta1.ReplicaSet { +func parseToPserver(job *paddlev1.TrainingJob) *v1beta1.ReplicaSet { replicas := int32(job.Spec.Pserver.MinInstance) command := make([]string, 2, 2) // FIXME: refine these part. @@ -132,7 +129,7 @@ func (p *DefaultJobParser) parseToPserver(job *paddlev1.TrainingJob) *v1beta1.Re } // parseToTrainer parse TrainingJob to a kubernetes job resource. -func (p *DefaultJobParser) parseToTrainer(job *paddlev1.TrainingJob) *batchv1.Job { +func parseToTrainer(job *paddlev1.TrainingJob) *batchv1.Job { replicas := int32(job.Spec.Trainer.MinInstance) command := make([]string, 2) if job.Spec.FaultTolerant { @@ -211,7 +208,7 @@ func getEtcdPodSpec(job *paddlev1.TrainingJob) *corev1.Container { } // parseToMaster parse TrainingJob to a kubernetes replicaset resource. -func (p *DefaultJobParser) parseToMaster(job *paddlev1.TrainingJob) *v1beta1.ReplicaSet { +func parseToMaster(job *paddlev1.TrainingJob) *v1beta1.ReplicaSet { replicas := int32(1) // FIXME: refine these part. command := []string{"paddle_k8s", "start_master"} diff --git a/pkg/updater/trainingJobUpdater.go b/pkg/updater/trainingJobUpdater.go index 7cb89ff5..7770e18f 100644 --- a/pkg/updater/trainingJobUpdater.go +++ b/pkg/updater/trainingJobUpdater.go @@ -2,6 +2,9 @@ package updater import ( "fmt" + "reflect" + "time" + log "github.com/golang/glog" padv1 "github.com/paddlepaddle/edl/pkg/apis/paddlepaddle/v1" trainingJobClient "github.com/paddlepaddle/edl/pkg/client/clientset/versioned" @@ -9,8 +12,6 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - "reflect" - "time" ) const ( @@ -96,9 +97,9 @@ func (updater *TrainingJobUpdater) Modify(nj *padv1.TrainingJob) { func (updater *TrainingJobUpdater) releaseResource(tp padv1.TrainingResourceType) error { resource := new(v1beta1.ReplicaSet) switch tp { - case padv1.MASTER: + case padv1.Master: resource = updater.job.Spec.Master.ReplicaSpec - case padv1.PSERVER: + case padv1.Pserver: resource = updater.job.Spec.Pserver.ReplicaSpec default: return fmt.Errorf("unknow resource") @@ -131,11 +132,11 @@ func (updater *TrainingJobUpdater) releaseResource(tp padv1.TrainingResourceType } func (updater *TrainingJobUpdater) releaseMaster() error { - return updater.releaseResource(padv1.MASTER) + return updater.releaseResource(padv1.Master) } func (updater *TrainingJobUpdater) releasePserver() error { - return updater.releaseResource(padv1.PSERVER) + return updater.releaseResource(padv1.Pserver) } func (updater *TrainingJobUpdater) releaseTrainer() error { @@ -206,12 +207,12 @@ func (updater *TrainingJobUpdater) deleteTrainingJob() error { func (updater *TrainingJobUpdater) createResource(tp padv1.TrainingResourceType) error { resource := new(v1beta1.ReplicaSet) switch tp { - case padv1.MASTER: + case padv1.Master: resource = updater.job.Spec.Master.ReplicaSpec - case padv1.PSERVER: + case padv1.Pserver: resource = updater.job.Spec.Pserver.ReplicaSpec default: - return fmt.Errorf("unknow resource") + return fmt.Errorf("unknown resource") } for { _, err := updater.kubeClient.ExtensionsV1beta1().ReplicaSets(updater.job.Namespace).Get(resource.Name, v1.GetOptions{}) @@ -279,11 +280,11 @@ func (updater *TrainingJobUpdater) createTrainer() error { func (updater *TrainingJobUpdater) createTrainingJob() error { if updater.job.Spec.FaultTolerant { - if err := updater.createResource(padv1.MASTER); err != nil { + if err := updater.createResource(padv1.Master); err != nil { return err } } - if err := updater.createResource(padv1.PSERVER); err != nil { + if err := updater.createResource(padv1.Pserver); err != nil { return err } return updater.createTrainer() @@ -312,7 +313,12 @@ func (updater *TrainingJobUpdater) parseTrainingJob() { } err := func() error { - // TODO(Zhengqi): Parse TrainingJob, this will be submitted in the next pr + var parser DefaultJobParser + if err := parser.Validate(updater.job); err != nil { + return err + } + + updater.job = parser.ParseToTrainingJob(updater.job) return nil }() @@ -328,7 +334,7 @@ func (updater *TrainingJobUpdater) parseTrainingJob() { func (updater *TrainingJobUpdater) getTrainerReplicaStatuses() ([]*padv1.TrainingResourceStatus, error) { var replicaStatuses []*padv1.TrainingResourceStatus trs := padv1.TrainingResourceStatus{ - TrainingResourceType: padv1.TRAINER, + TrainingResourceType: padv1.Trainer, State: padv1.ResourceStateNone, ResourceStates: make(map[padv1.ResourceState]int), } From d1038b51a3cf4466b55f15622efbb96b326a0f2c Mon Sep 17 00:00:00 2001 From: m3ngyang Date: Fri, 23 Mar 2018 13:53:13 +0800 Subject: [PATCH 3/6] fix metaobject of pserver --- pkg/updater/jobparser.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/updater/jobparser.go b/pkg/updater/jobparser.go index e6c831ce..7e5db555 100644 --- a/pkg/updater/jobparser.go +++ b/pkg/updater/jobparser.go @@ -103,7 +103,10 @@ func parseToPserver(job *paddlev1.TrainingJob) *v1beta1.ReplicaSet { Kind: "extensions/v1beta1", APIVersion: "ReplicaSet", }, - ObjectMeta: job.ObjectMeta, + ObjectMeta: metav1.ObjectMeta{ + Name: job.ObjectMeta.Name + "-pserver", + Namespace: job.ObjectMeta.Namespace, + }, Spec: v1beta1.ReplicaSetSpec{ Replicas: &replicas, Template: corev1.PodTemplateSpec{ From 65dd4562d707504b8ec42e8ab54aa52df9b9f29c Mon Sep 17 00:00:00 2001 From: m3ngyang Date: Fri, 30 Mar 2018 19:11:24 +0800 Subject: [PATCH 4/6] redefine generation function of job parser --- pkg/apis/paddlepaddle/v1/trainingjob.go | 7 ++++-- pkg/apis/paddlepaddle/v1/types.go | 1 + pkg/updater/jobparser.go | 30 ++++++++++++------------- pkg/updater/trainingJobUpdater.go | 16 +++++-------- 4 files changed, 25 insertions(+), 29 deletions(-) diff --git a/pkg/apis/paddlepaddle/v1/trainingjob.go b/pkg/apis/paddlepaddle/v1/trainingjob.go index eb2e2c1b..41ef6b76 100644 --- a/pkg/apis/paddlepaddle/v1/trainingjob.go +++ b/pkg/apis/paddlepaddle/v1/trainingjob.go @@ -1,6 +1,9 @@ package v1 -import "encoding/json" +import ( + "encoding/json" + "fmt" +) // Elastic returns true if the job can scale to more workers. func (s *TrainingJob) Elastic() bool { @@ -25,5 +28,5 @@ func (s *TrainingJob) NeedGPU() bool { func (s *TrainingJob) String() string { b, _ := json.MarshalIndent(s, "", " ") - return string(b[:]) + return fmt.Sprintf("%s", b) } diff --git a/pkg/apis/paddlepaddle/v1/types.go b/pkg/apis/paddlepaddle/v1/types.go index 3b91280d..ca88b3b5 100644 --- a/pkg/apis/paddlepaddle/v1/types.go +++ b/pkg/apis/paddlepaddle/v1/types.go @@ -55,6 +55,7 @@ type TrainingJobSpec struct { Passes int `json:"passes,omitempty"` Volumes []corev1.Volume `json:"volumes"` VolumeMounts []corev1.VolumeMount `json:"VolumeMounts"` + //TODO simplify the structure of sub-resource(mengyang) //TrainingJob components. Master MasterSpec `json:"master"` Pserver PserverSpec `json:"pserver"` diff --git a/pkg/updater/jobparser.go b/pkg/updater/jobparser.go index 7e5db555..462a333c 100644 --- a/pkg/updater/jobparser.go +++ b/pkg/updater/jobparser.go @@ -32,18 +32,12 @@ const ( imagePullPolicy = "Always" ) -// JobParser is a interface can parse given simple TrainingJob struct to -// an integrated TrainingJob -type JobParser interface { - Validate(job *paddlev1.TrainingJob) error - ParseToTrainingJob(job *paddlev1.TrainingJob) *paddlev1.TrainingJob -} - // DefaultJobParser implement a basic JobParser. -type DefaultJobParser int +type DefaultJobParser struct { +} -// Validate updates default values for the added job and validates the fields. -func (p *DefaultJobParser) Validate(job *paddlev1.TrainingJob) error { +// setDefaultAndValidate updates default values for the added job and validates the fields. +func setDefaultAndValidate(job *paddlev1.TrainingJob) error { // Fill in default values // FIXME: Need to test. What is the value if specified "omitempty" if job.Spec.Port == 0 { @@ -65,12 +59,16 @@ func (p *DefaultJobParser) Validate(job *paddlev1.TrainingJob) error { if !job.Spec.FaultTolerant && job.Elastic() { return errors.New("max-instances should equal to min-instances when fault_tolerant is disabled") } - // TODO: add validations. + // TODO: add validations.(helin) return nil } -// ParseToTrainingJob generates a whole structure of TrainingJob -func (p *DefaultJobParser) ParseToTrainingJob(job *paddlev1.TrainingJob) *paddlev1.TrainingJob { +// NewTrainingJob generates a whole structure of TrainingJob +func (p *DefaultJobParser) NewTrainingJob(job *paddlev1.TrainingJob) (*paddlev1.TrainingJob, error) { + if err := setDefaultAndValidate(job); err != nil { + return nil, err + } + useHostNetwork := job.Spec.HostNetwork if job.Spec.FaultTolerant { job.Spec.Master.ReplicaSpec = parseToMaster(job) @@ -84,14 +82,14 @@ func (p *DefaultJobParser) ParseToTrainingJob(job *paddlev1.TrainingJob) *paddle job.Spec.Pserver.ReplicaSpec.Spec.Template.Spec.HostNetwork = true job.Spec.Trainer.ReplicaSpec.Spec.Template.Spec.HostNetwork = true } - return job + return job, nil } // parseToPserver generate a pserver replicaset resource according to "TrainingJob" resource specs. func parseToPserver(job *paddlev1.TrainingJob) *v1beta1.ReplicaSet { replicas := int32(job.Spec.Pserver.MinInstance) command := make([]string, 2, 2) - // FIXME: refine these part. + // FIXME: refine these part.(typhoonzero) if job.Spec.FaultTolerant { command = []string{"paddle_k8s", "start_pserver"} } else { @@ -213,7 +211,7 @@ func getEtcdPodSpec(job *paddlev1.TrainingJob) *corev1.Container { // parseToMaster parse TrainingJob to a kubernetes replicaset resource. func parseToMaster(job *paddlev1.TrainingJob) *v1beta1.ReplicaSet { replicas := int32(1) - // FIXME: refine these part. + // FIXME: refine these part.(typhoonzero) command := []string{"paddle_k8s", "start_master"} return &v1beta1.ReplicaSet{ diff --git a/pkg/updater/trainingJobUpdater.go b/pkg/updater/trainingJobUpdater.go index 7770e18f..9679db09 100644 --- a/pkg/updater/trainingJobUpdater.go +++ b/pkg/updater/trainingJobUpdater.go @@ -312,19 +312,13 @@ func (updater *TrainingJobUpdater) parseTrainingJob() { return } - err := func() error { - var parser DefaultJobParser - if err := parser.Validate(updater.job); err != nil { - return err - } + var parser DefaultJobParser + var creatErr error + updater.job, creatErr = parser.NewTrainingJob(updater.job) - updater.job = parser.ParseToTrainingJob(updater.job) - return nil - }() - - if err != nil { + if creatErr != nil { updater.status.Phase = padv1.TrainingJobPhaseFailed - updater.status.Reason = err.Error() + updater.status.Reason = creatErr.Error() } else { updater.status.Phase = padv1.TrainingJobPhaseCreating updater.status.Reason = "" From 499e1fb9f95d13c2705f8fb7a0faa9c60835fab6 Mon Sep 17 00:00:00 2001 From: m3ngyang Date: Wed, 4 Apr 2018 20:18:14 +0800 Subject: [PATCH 5/6] fix code style convention problem --- pkg/apis/paddlepaddle/v1/{trainingjob.go => training_job.go} | 0 pkg/apis/paddlepaddle/v1/types.go | 2 +- pkg/updater/jobparser.go | 4 ---- pkg/updater/trainingJobUpdater.go | 2 ++ 4 files changed, 3 insertions(+), 5 deletions(-) rename pkg/apis/paddlepaddle/v1/{trainingjob.go => training_job.go} (100%) diff --git a/pkg/apis/paddlepaddle/v1/trainingjob.go b/pkg/apis/paddlepaddle/v1/training_job.go similarity index 100% rename from pkg/apis/paddlepaddle/v1/trainingjob.go rename to pkg/apis/paddlepaddle/v1/training_job.go diff --git a/pkg/apis/paddlepaddle/v1/types.go b/pkg/apis/paddlepaddle/v1/types.go index ca88b3b5..0d4c6867 100644 --- a/pkg/apis/paddlepaddle/v1/types.go +++ b/pkg/apis/paddlepaddle/v1/types.go @@ -55,7 +55,7 @@ type TrainingJobSpec struct { Passes int `json:"passes,omitempty"` Volumes []corev1.Volume `json:"volumes"` VolumeMounts []corev1.VolumeMount `json:"VolumeMounts"` - //TODO simplify the structure of sub-resource(mengyang) + //TODO(m3ngyang) simplify the structure of sub-resource(mengyang) //TrainingJob components. Master MasterSpec `json:"master"` Pserver PserverSpec `json:"pserver"` diff --git a/pkg/updater/jobparser.go b/pkg/updater/jobparser.go index 462a333c..2617781f 100644 --- a/pkg/updater/jobparser.go +++ b/pkg/updater/jobparser.go @@ -249,9 +249,7 @@ func parseToMaster(job *paddlev1.TrainingJob) *v1beta1.ReplicaSet { } } -// ----------------------------------------------------------------------- // general functions that pserver, trainer use the same -// ----------------------------------------------------------------------- func podPorts(job *paddlev1.TrainingJob) []corev1.ContainerPort { portsTotal := job.Spec.PortsNum + job.Spec.PortsNumForSparse ports := make([]corev1.ContainerPort, 0) @@ -330,6 +328,4 @@ func podEnv(job *paddlev1.TrainingJob) []corev1.EnvVar { } } -// ----------------------------------------------------------------------- // general functions end -// ----------------------------------------------------------------------- diff --git a/pkg/updater/trainingJobUpdater.go b/pkg/updater/trainingJobUpdater.go index 9679db09..11a36b72 100644 --- a/pkg/updater/trainingJobUpdater.go +++ b/pkg/updater/trainingJobUpdater.go @@ -6,8 +6,10 @@ import ( "time" log "github.com/golang/glog" + padv1 "github.com/paddlepaddle/edl/pkg/apis/paddlepaddle/v1" trainingJobClient "github.com/paddlepaddle/edl/pkg/client/clientset/versioned" + "k8s.io/api/extensions/v1beta1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1" From 39f13809b522189e1717b9ae4be4953652b17077 Mon Sep 17 00:00:00 2001 From: m3ngyang Date: Wed, 11 Apr 2018 11:01:44 +0800 Subject: [PATCH 6/6] fix definition of command variable --- pkg/updater/jobparser.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/updater/jobparser.go b/pkg/updater/jobparser.go index 2617781f..08de0afe 100644 --- a/pkg/updater/jobparser.go +++ b/pkg/updater/jobparser.go @@ -88,7 +88,7 @@ func (p *DefaultJobParser) NewTrainingJob(job *paddlev1.TrainingJob) (*paddlev1. // parseToPserver generate a pserver replicaset resource according to "TrainingJob" resource specs. func parseToPserver(job *paddlev1.TrainingJob) *v1beta1.ReplicaSet { replicas := int32(job.Spec.Pserver.MinInstance) - command := make([]string, 2, 2) + var command []string // FIXME: refine these part.(typhoonzero) if job.Spec.FaultTolerant { command = []string{"paddle_k8s", "start_pserver"} @@ -132,7 +132,7 @@ func parseToPserver(job *paddlev1.TrainingJob) *v1beta1.ReplicaSet { // parseToTrainer parse TrainingJob to a kubernetes job resource. func parseToTrainer(job *paddlev1.TrainingJob) *batchv1.Job { replicas := int32(job.Spec.Trainer.MinInstance) - command := make([]string, 2) + var command []string if job.Spec.FaultTolerant { command = []string{"paddle_k8s", "start_trainer"} } else {