Skip to content

Commit

Permalink
Add putfiles command and add parse functions on controller. (#496)
Browse files Browse the repository at this point in the history
Add `putfiles` command and add `parse` functions on controller
  • Loading branch information
gongweibao committed Nov 16, 2017
1 parent ef384e9 commit 1c02c74
Show file tree
Hide file tree
Showing 5 changed files with 405 additions and 36 deletions.
10 changes: 2 additions & 8 deletions go/cmd/paddlectl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,8 @@ func main() {
subcommands.Register(subcommands.CommandsCommand(), "")
subcommands.Register(&paddlectl.SubmitCmd{}, "")

// TODO(gongwb): add these commands.
// subcommands.Register(&paddlecloud.LogsCommand{}, "")
// subcommands.Register(&paddlecloud.GetCommand{}, "")
// subcommands.Register(&paddlecloud.KillCommand{}, "")
// subcommands.Register(&paddlecloud.SimpleFileCmd{}, "")
// subcommands.Register(&paddlecloud.RegistryCmd{}, "")
// subcommands.Register(&paddlecloud.DeleteCommand{}, "")
// subcommands.Register(&paddlecloud.PublishCmd{}, "")
// TODO(gongwb): add more commands.
subcommands.Register(&paddlectl.SimpleFileCmd{}, "")

flag.Parse()
ctx := context.Background()
Expand Down
25 changes: 21 additions & 4 deletions go/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package controller

import (
"context"
"encoding/json"

log "github.com/inconshreveable/log15"

Expand Down Expand Up @@ -104,10 +105,26 @@ func (c *Controller) onAdd(obj interface{}) {
job := obj.(*paddlejob.TrainingJob)
log.Debug("TrainingJob resource added", "name", job.ObjectMeta.Name)
c.autoscaler.OnAdd(job)
// TODO: if we need to create training job instance by the resource,
// you should add the following code:
// var parser DefaultJobParser
// c.clientset.ExtensionsV1beta1().ReplicaSets(namespace).Create(parser.ParseToPserver(job))

// TODO(gongwb):open it when all are ready.
// All-are-ready means:
// create trainjob from paddlectl
// scheduler can schedule trainjobs
var parser DefaultJobParser
p := parser.ParseToPserver(job)
t := parser.ParseToTrainer(job)
m := parser.ParseToMaster(job)

b, _ := json.MarshalIndent(p, "", " ")
log.Debug("create pserver:" + string(b))

b, _ = json.MarshalIndent(t, "", " ")
log.Debug("create trainer-job:" + string(b))

b, _ = json.MarshalIndent(m, "", " ")
log.Debug("create master:" + string(b))

// TODO(gongwb): create them
}

func (c *Controller) onUpdate(oldObj, newObj interface{}) {
Expand Down
170 changes: 159 additions & 11 deletions go/controller/jobparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@ import (
"strconv"

paddlejob "github.com/PaddlePaddle/cloud/go/api"
apiresource "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/pkg/api/v1"
batchv1 "k8s.io/client-go/pkg/apis/batch/v1"
v1beta1 "k8s.io/client-go/pkg/apis/extensions/v1beta1"
)

const (
imagePullPolicy = "Always"
)

// JobParser is a interface can parse "TrainingJob" to
// ReplicaSet and job.
type JobParser interface {
Expand Down Expand Up @@ -93,7 +98,7 @@ func (p *DefaultJobParser) ParseToPserver(job *paddlejob.TrainingJob) *v1beta1.R
Volumes: podVolumes(job),
Containers: []v1.Container{
v1.Container{
Name: job.ObjectMeta.Name,
Name: "pserver",
Image: job.Spec.Image,
Ports: podPorts(job),
Env: podEnv(job),
Expand All @@ -109,14 +114,124 @@ func (p *DefaultJobParser) ParseToPserver(job *paddlejob.TrainingJob) *v1beta1.R

// ParseToTrainer parse TrainingJob to a kubernetes job resource.
func (p *DefaultJobParser) ParseToTrainer(job *paddlejob.TrainingJob) *batchv1.Job {
// TODO: create job.
return &batchv1.Job{}
replicas := int32(job.Spec.Trainer.MinInstance)
command := make([]string, 2)
if job.Spec.FaultTolerant {
command = []string{"paddle_k8s", "start_trainer"}
} else {
command = []string{"paddle_k8s", "start_new_trainer"}
}

return &batchv1.Job{
TypeMeta: metav1.TypeMeta{
Kind: "Job",
APIVersion: "batch/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: job.ObjectMeta.Name + "-trainer",
Namespace: job.ObjectMeta.Namespace,
},
Spec: batchv1.JobSpec{
Parallelism: &replicas,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"paddle-job": job.ObjectMeta.Name},
},
Spec: v1.PodSpec{
Volumes: podVolumes(job),
Containers: []v1.Container{
v1.Container{
Name: "trainer",
Image: job.Spec.Image,
ImagePullPolicy: imagePullPolicy,
Command: command,
VolumeMounts: podVolumeMounts(job),
Ports: podPorts(job),
Env: podEnv(job),
Resources: job.Spec.Trainer.Resources,
},
},
RestartPolicy: "Never",
},
},
},
}
}

func masterResource(job *paddlejob.TrainingJob) *v1.ResourceRequirements {
// TODO(gongwb): config master resource?
return &v1.ResourceRequirements{
Limits: v1.ResourceList{
"cpu": *apiresource.NewQuantity(int64(1), apiresource.DecimalSI),
"memory": apiresource.MustParse("500Mi"),
},
Requests: v1.ResourceList{
"cpu": *apiresource.NewQuantity(int64(2), apiresource.DecimalSI),
"memory": apiresource.MustParse("1Gi"),
},
}
}

func getEtcdPodSpec(job *paddlejob.TrainingJob) *v1.Container {
command := []string{"etcd", "-name", "etcd0",
"-advertise-client-urls", "http://$(POD_IP):2379,http://$(POD_IP):4001",
"-listen-client-urls", "http://0.0.0.0:2379,http://0.0.0.0:4001",
"-initial-advertise-peer-urls", "http://$(POD_IP):2380",
"-listen-peer-urls", "http://0.0.0.0:2380",
"-initial-cluster", "etcd0=http://$(POD_IP):2380",
"-initial-cluster-state", "new"}

return &v1.Container{
Name: "etcd",
Image: "quay.io/coreos/etcd:v3.2.1",
ImagePullPolicy: imagePullPolicy,
// TODO(gongwb): etcd ports?
Env: podEnv(job),
Command: command,
}
}

// ParseToMaster parse TrainingJob to a kubernetes replicaset resource.
func (p *DefaultJobParser) ParseToMaster(job *paddlejob.TrainingJob) *v1beta1.ReplicaSet {
// TODO: create master if needed.
return &v1beta1.ReplicaSet{}
replicas := int32(1)
// FIXME: refine these part.
command := []string{"paddle_k8s", "start_master"}

return &v1beta1.ReplicaSet{
TypeMeta: metav1.TypeMeta{
Kind: "extensions/v1beta1",
APIVersion: "ReplicaSet",
},
ObjectMeta: metav1.ObjectMeta{
Name: job.ObjectMeta.Name + "-master",
Namespace: job.ObjectMeta.Namespace,
},
Spec: v1beta1.ReplicaSetSpec{
Replicas: &replicas,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"paddle-job-master": job.ObjectMeta.Name},
},
Spec: v1.PodSpec{
// TODO: setup pserver volumes on cloud.
Volumes: podVolumes(job),
Containers: []v1.Container{
v1.Container{
Name: "master",
Image: job.Spec.Image,
ImagePullPolicy: imagePullPolicy,
Ports: masterPorts(job),
// TODO(gongwb):master env
Command: command,
VolumeMounts: podVolumeMounts(job),
Resources: *masterResource(job),
},
*getEtcdPodSpec(job),
},
},
},
},
}
}

// -----------------------------------------------------------------------
Expand All @@ -133,7 +248,21 @@ func podPorts(job *paddlejob.TrainingJob) []v1.ContainerPort {
})
basePort++
}
return []v1.ContainerPort{}
return ports
}

func masterPorts(job *paddlejob.TrainingJob) []v1.ContainerPort {
ports := []v1.ContainerPort{
v1.ContainerPort{
Name: "master-port",
ContainerPort: 8080,
},
v1.ContainerPort{
Name: "etcd-port",
ContainerPort: 2379,
},
}
return ports
}

func podEnv(job *paddlejob.TrainingJob) []v1.EnvVar {
Expand All @@ -150,6 +279,7 @@ func podEnv(job *paddlejob.TrainingJob) []v1.EnvVar {
// FIXME: CPU resource value can be less than 1.
trainerCount = int(q.Value())
}

return []v1.EnvVar{
v1.EnvVar{Name: "PADDLE_JOB_NAME", Value: job.ObjectMeta.Name},
// NOTICE: TRAINERS, PSERVERS, PADDLE_INIT_NUM_GRADIENT_SERVERS
Expand All @@ -171,23 +301,41 @@ func podEnv(job *paddlejob.TrainingJob) []v1.EnvVar {
v1.EnvVar{Name: "PADDLE_INIT_NUM_GRADIENT_SERVERS", Value: strconv.Itoa(job.Spec.Trainer.MinInstance)},
v1.EnvVar{Name: "PADDLE_INIT_NUM_PASSES", Value: strconv.Itoa(job.Spec.Passes)},
v1.EnvVar{Name: "PADDLE_INIT_USE_GPU", Value: needGPU},
v1.EnvVar{Name: "LD_LIBRARY_PATH", Value: job.Spec.Trainer.Entrypoint},
v1.EnvVar{Name: "LD_LIBRARY_PATH", Value: "/usr/local/cuda/lib64"},
v1.EnvVar{Name: "NAMESPACE", ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
}},
v1.EnvVar{Name: "POD_IP", ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
}},
}
}

func podVolumes(job *paddlejob.TrainingJob) []v1.Volume {
// TODO: prepare volumes.
return []v1.Volume{}
return []v1.Volume{
v1.Volume{
Name: job.ObjectMeta.Name + "-workspace",
// TODO(gongwb): add support to ceph fs and mount public path.
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{
Path: job.Spec.Trainer.Workspace,
},
},
},
}
}

func podVolumeMounts(job *paddlejob.TrainingJob) []v1.VolumeMount {
// TODO: preapare volume mounts for pods.
return []v1.VolumeMount{}
return []v1.VolumeMount{
v1.VolumeMount{
Name: job.ObjectMeta.Name + "-workspace",
MountPath: job.Spec.Trainer.Workspace,
},
}
}

// -----------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 1c02c74

Please sign in to comment.