diff --git a/Dockerfile b/Dockerfile index 87926d2f..b3c1ffae 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,8 +2,14 @@ FROM golang:1.8 RUN go get github.com/Masterminds/glide RUN apt-get update && apt-get install -y git WORKDIR $GOPATH/src/github.com/paddlepaddle -RUN git clone https://github.com/paddlepaddle/edl.git +RUN mkdir -p $GOPATH/src/github.com/paddlepaddle/edl +# Add ENV http_proxy=[your proxy server] if needed +# run glide install before building go sources, so that +# if we change the code and rebuild the image can cost little time +ADD ./glide.yaml ./glide.lock $GOPATH/src/github.com/paddlepaddle/edl/ WORKDIR $GOPATH/src/github.com/paddlepaddle/edl RUN glide install --strip-vendor +ADD . $GOPATH/src/github.com/paddlepaddle/edl RUN go build -o /usr/local/bin/edl github.com/paddlepaddle/edl/cmd/edl +RUN rm -rf $GOPATH/src/github.com/paddlepaddle/edl CMD ["edl"] diff --git a/cmd/edl/edl.go b/cmd/edl/edl.go index 197b7016..02b36e42 100644 --- a/cmd/edl/edl.go +++ b/cmd/edl/edl.go @@ -24,7 +24,7 @@ func main() { candy.Must(err) log.Root().SetHandler( - log.LvlFilterHandler(lvl, log.CallerStackHandler("%+v", log.StderrHandler)), + log.LvlFilterHandler(lvl, log.CallerFileHandler(log.StderrHandler)), ) // Create the client config. Use kubeconfig if given, otherwise assume in-cluster. diff --git a/edl_controller.yaml b/edl_controller.yaml new file mode 100644 index 00000000..ab32175e --- /dev/null +++ b/edl_controller.yaml @@ -0,0 +1,21 @@ +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + name: training-job-controller + namespace: paddlecloud +spec: + replicas: 1 + template: + metadata: + labels: + name: training-job-controller + spec: + containers: + - name: training-job-controller + image: paddlepaddle/edl-controller + env: + - name: https_proxy + value: "" + - name: http_proxy + value: "" + command: ["/usr/local/bin/edl", "-logtostderr", "-log_level", "info"] diff --git a/example/Dockerfile b/example/Dockerfile new file mode 100644 index 00000000..4edf29d7 --- /dev/null +++ b/example/Dockerfile @@ -0,0 +1,4 @@ +FROM paddlepaddle/paddlecloud-job +RUN mkdir -p /workspace +ADD train_ft.py /workspace + diff --git a/example/examplejob.yaml b/example/examplejob.yaml new file mode 100644 index 00000000..c183e157 --- /dev/null +++ b/example/examplejob.yaml @@ -0,0 +1,34 @@ +apiVersion: paddlepaddle.org/v1 +kind: TrainingJob +metadata: + name: example +spec: + image: "paddlepaddle/paddlecloud-job" + port: 7164 + ports_num: 1 + ports_num_for_sparse: 1 + fault_tolerant: true + trainer: + entrypoint: "python /workspace/vgg16_v2.py" + workspace: "/workspace" + passes: 50 + min-instance: 2 + max-instance: 6 + resources: + limits: + #alpha.kubernetes.io/nvidia-gpu: 1 + cpu: "200m" + memory: "200Mi" + requests: + cpu: "200m" + memory: "200Mi" + pserver: + min-instance: 2 + max-instance: 2 + resources: + limits: + cpu: "800m" + memory: "1Gi" + requests: + cpu: "500m" + memory: "600Mi" diff --git a/example/train_ft.py b/example/train_ft.py new file mode 100644 index 00000000..aba6d870 --- /dev/null +++ b/example/train_ft.py @@ -0,0 +1,140 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from PIL import Image +import numpy as np +import paddle.v2 as paddle +import paddle.v2.dataset.common as common +import paddle.v2.dataset as dataset +import os +import sys + +TRAINER_ID = int(os.getenv("PADDLE_INIT_TRAINER_ID", "-1")) +TOTAL_TRAINERS = int(os.getenv("PADDLE_INIT_NUM_GRADIENT_SERVERS", "-1")) + + +def softmax_regression(img): + predict = paddle.layer.fc(input=img, + size=10, + act=paddle.activation.Softmax()) + return predict + + +def multilayer_perceptron(img): + # The first fully-connected layer + hidden1 = paddle.layer.fc(input=img, + size=128, + act=paddle.activation.Relu()) + # The second fully-connected layer and the according activation function + hidden2 = paddle.layer.fc(input=hidden1, + size=64, + act=paddle.activation.Relu()) + # The thrid fully-connected layer, note that the hidden size should be 10, + # which is the number of unique digits + predict = paddle.layer.fc(input=hidden2, + size=10, + act=paddle.activation.Softmax()) + return predict + + +def convolutional_neural_network(img): + # first conv layer + conv_pool_1 = paddle.networks.simple_img_conv_pool( + input=img, + filter_size=5, + num_filters=20, + num_channel=1, + pool_size=2, + pool_stride=2, + act=paddle.activation.Relu()) + # second conv layer + conv_pool_2 = paddle.networks.simple_img_conv_pool( + input=conv_pool_1, + filter_size=5, + num_filters=50, + num_channel=20, + pool_size=2, + pool_stride=2, + act=paddle.activation.Relu()) + # fully-connected layer + predict = paddle.layer.fc(input=conv_pool_2, + size=10, + act=paddle.activation.Softmax()) + return predict + + +def main(): + etcd_ip = os.getenv("ETCD_IP") + etcd_endpoint = "http://" + etcd_ip + ":" + "2379" + paddle.init() + + # define network topology + images = paddle.layer.data( + name='pixel', type=paddle.data_type.dense_vector(784)) + label = paddle.layer.data( + name='label', type=paddle.data_type.integer_value(10)) + + # Here we can build the prediction network in different ways. Please + # choose one by uncomment corresponding line. + # predict = softmax_regression(images) + # predict = multilayer_perceptron(images) + predict = convolutional_neural_network(images) + + cost = paddle.layer.classification_cost(input=predict, label=label) + + parameters = paddle.parameters.create(cost) + + optimizer = paddle.optimizer.Momentum( + learning_rate=0.1 / 128.0, + momentum=0.9, + regularization=paddle.optimizer.L2Regularization(rate=0.0005 * 128)) + + trainer = paddle.trainer.SGD(cost=cost, + parameters=parameters, + update_equation=optimizer, + is_local=False, + pserver_spec=etcd_endpoint, + use_etcd=True) + + def event_handler(event): + if isinstance(event, paddle.event.EndIteration): + if event.batch_id % 100 == 0: + print "Pass %d, Batch %d, Cost %f, %s" % ( + event.pass_id, event.batch_id, event.cost, event.metrics) + if isinstance(event, paddle.event.EndPass): + result = trainer.test(reader=paddle.batch( + dataset.mnist.test(), + batch_size=2)) + print "Test with Pass %d, Cost %f, %s\n" % ( + event.pass_id, result.cost, result.metrics) + + trainer.train( + reader=paddle.batch( + dataset.mnist.train(), + batch_size=128), + event_handler=event_handler, + num_passes=5) + + +if __name__ == '__main__': + usage = "python train.py [prepare|train]" + if len(sys.argv) != 2: + print usage + exit(1) + + if TRAINER_ID == -1 or TOTAL_TRAINERS == -1: + print "no cloud environ found, must run on cloud" + exit(1) + if sys.argv[1] == "train": + main() diff --git a/pkg/controller.go b/pkg/controller.go index cd00868e..21e32f02 100644 --- a/pkg/controller.go +++ b/pkg/controller.go @@ -117,20 +117,34 @@ func (c *Controller) onAdd(obj interface{}) { // create trainjob from paddlectl // scheduler can schedule trainjobs var parser DefaultJobParser + m := parser.ParseToMaster(job) p := parser.ParseToPserver(job) t := parser.ParseToTrainer(job) - m := parser.ParseToMaster(job) - b, _ := json.MarshalIndent(p, "", " ") + b, _ := json.MarshalIndent(m, "", " ") + log.Debug("create master:" + string(b)) + + b, _ = json.MarshalIndent(p, "", " ") log.Debug("create pserver:" + string(b)) b, _ = json.MarshalIndent(t, "", " ") log.Debug("create trainer-job:" + string(b)) - b, _ = json.MarshalIndent(m, "", " ") - log.Debug("create master:" + string(b)) - - // TODO(gongwb): create them + // create all resources + _, err := c.clientset.ExtensionsV1beta1().ReplicaSets(m.ObjectMeta.Namespace).Create(m) + if err != nil { + log.Error("create master", "error", err) + } + + _, err = c.clientset.ExtensionsV1beta1().ReplicaSets(m.ObjectMeta.Namespace).Create(p) + if err != nil { + log.Error("create pserver", "error", err) + } + + _, err = c.clientset.BatchV1().Jobs(t.ObjectMeta.Namespace).Create(t) + if err != nil { + log.Error("create trainer", "error", err) + } } func (c *Controller) onUpdate(oldObj, newObj interface{}) { diff --git a/pkg/jobparser.go b/pkg/jobparser.go index 0f7e2805..e3cb6e55 100644 --- a/pkg/jobparser.go +++ b/pkg/jobparser.go @@ -19,6 +19,7 @@ import ( "fmt" "strconv" + log "github.com/inconshreveable/log15" edlresource "github.com/paddlepaddle/edl/pkg/resource" batchv1 "k8s.io/api/batch/v1" "k8s.io/api/core/v1" @@ -86,7 +87,10 @@ func (p *DefaultJobParser) ParseToPserver(job *edlresource.TrainingJob) *v1beta1 Kind: "extensions/v1beta1", APIVersion: "ReplicaSet", }, - ObjectMeta: job.ObjectMeta, + ObjectMeta: metav1.ObjectMeta{ + Name: job.ObjectMeta.Name + "-pserver", + Namespace: job.ObjectMeta.Namespace, + }, Spec: v1beta1.ReplicaSetSpec{ Replicas: &replicas, Template: v1.PodTemplateSpec{ @@ -235,14 +239,17 @@ func (p *DefaultJobParser) ParseToMaster(job *edlresource.TrainingJob) *v1beta1. // general functions that pserver, trainer use the same // ----------------------------------------------------------------------- func podPorts(job *edlresource.TrainingJob) []v1.ContainerPort { + log.Debug("get pod ports", "portsnum", job.Spec.PortsNum, "sparse", job.Spec.PortsNumForSparse) portsTotal := job.Spec.PortsNum + job.Spec.PortsNumForSparse - ports := make([]v1.ContainerPort, 8) + ports := make([]v1.ContainerPort, portsTotal) basePort := int32(job.Spec.Port) for i := 0; i < portsTotal; i++ { - ports = append(ports, v1.ContainerPort{ + log.Debug("adding port ", "base", basePort, + " total ", portsTotal) + ports[i] = v1.ContainerPort{ Name: fmt.Sprintf("jobport-%d", basePort), ContainerPort: basePort, - }) + } basePort++ } return ports diff --git a/pkg/trainingjober.go b/pkg/trainingjober.go deleted file mode 100644 index 476e0b30..00000000 --- a/pkg/trainingjober.go +++ /dev/null @@ -1,207 +0,0 @@ -/* Copyright (c) 2016 PaddlePaddle Authors All Rights Reserve. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. */ - -package edl - -import ( - "fmt" - "time" - - log "github.com/inconshreveable/log15" - edlresource "github.com/paddlepaddle/edl/pkg/resource" -) - -const ( - defaultLoopNum = 3 - defaultDur = 1 * time.Second -) - -// TrainingJober mananges TraingJobs. -type TrainingJober struct { - cluster *Cluster -} - -// NewTrainingJober create a TrainingJober. -func NewTrainingJober(c *Cluster) *TrainingJober { - return &TrainingJober{ - cluster: c, - } -} - -func (c *TrainingJober) cleanupPserver(namespace, jobname string) error { - name := jobname + "-pserver" - err := c.cluster.DeleteReplicaSet(namespace, name) - if err != nil { - return fmt.Errorf("delete pserver namespace:%s name:%s error:%v", - namespace, name, err) - } - - log.Error(fmt.Sprintf("delete pserver namespace:%s name:%s", - namespace, name)) - return nil -} - -func (c *TrainingJober) cleanupMaster(namespace, jobname string) error { - name := jobname + "-master" - err := c.cluster.DeleteReplicaSet(namespace, name) - if err != nil { - return fmt.Errorf("delete master namespace:%s name:%s error:%v", - namespace, name, err) - } - - log.Error(fmt.Sprintf("delete master namespace:%s name:%s", - namespace, name)) - return nil -} - -func (c *TrainingJober) cleanupTrainer(namespace, jobname string) error { - name := jobname + "-trainer" - err := c.cluster.DeleteTrainerJob(namespace, name) - if err != nil { - return fmt.Errorf("delete trainerjob namespace:%v name:%v error:%v", - namespace, name, err) - } - - log.Error(fmt.Sprintf("delete trainerjob namespace:%s name:%s", - namespace, name)) - return nil -} - -func (c *TrainingJober) createMaster(job *edlresource.TrainingJob) error { - var parser DefaultJobParser - m := parser.ParseToMaster(job) - - _, err := c.cluster.CreateReplicaSet(m) - if err != nil { - e := fmt.Errorf("create master namespace:%v name:%v error:%v", - job.ObjectMeta.Namespace, job.ObjectMeta.Name, err) - log.Error(e.Error()) - return e - } - - return nil -} - -func (c *TrainingJober) createPserver(job *edlresource.TrainingJob) error { - var parser DefaultJobParser - p := parser.ParseToPserver(job) - - _, err := c.cluster.CreateReplicaSet(p) - if err != nil { - e := fmt.Errorf("create pserver namespace:%v name:%v error:%v", - job.ObjectMeta.Namespace, job.ObjectMeta.Name, err) - log.Error(e.Error()) - return e - } - return nil -} - -func (c *TrainingJober) createTrainer(job *edlresource.TrainingJob) error { - var parser DefaultJobParser - t := parser.ParseToTrainer(job) - - _, err := c.cluster.CreateJob(t) - if err != nil { - e := fmt.Errorf("create trainerjob namespace:%v name:%v error:%v", - job.ObjectMeta.Namespace, job.ObjectMeta.Name, err) - log.Error(e.Error()) - return e - } - - return nil -} - -// Complete clears master and pserver resources. -func (c *TrainingJober) Complete(job *edlresource.TrainingJob) { - c.cleanupPserver(job.ObjectMeta.Namespace, - job.ObjectMeta.Name) - - c.cleanupMaster(job.ObjectMeta.Namespace, - job.ObjectMeta.Name) -} - -// Destroy destroys resource and pods. -func (c *TrainingJober) Destroy(job *edlresource.TrainingJob) { - c.Complete(job) - - c.cleanupTrainer(job.ObjectMeta.Namespace, - job.ObjectMeta.Name) -} - -func (c *TrainingJober) checkAndCreate(job *edlresource.TrainingJob) error { - tname := job.ObjectMeta.Name + "-trainer" - mname := job.ObjectMeta.Name + "-master" - pname := job.ObjectMeta.Name + "-pserver" - namespace := job.ObjectMeta.Namespace - - t, terr := c.cluster.GetTrainerJobByName(namespace, tname) - m, merr := c.cluster.GetReplicaSet(namespace, mname) - p, perr := c.cluster.GetReplicaSet(namespace, pname) - - if terr != nil || - merr != nil || - perr != nil { - err := fmt.Errorf("trainerjob_err:%v master_err:%v pserver_err:%v", - terr, merr, perr) - log.Error(err.Error()) - return err - } - - if m == nil { - if err := c.createMaster(job); err != nil { - return fmt.Errorf("namespace:%v create master:%v error:%v", - namespace, mname, err) - } - } - - if t == nil { - if err := c.createTrainer(job); err != nil { - if m == nil { - c.cleanupMaster(namespace, mname) - } - return fmt.Errorf("namespace:%v create trainer:%v error:%v", - namespace, tname, err) - } - } - - if p == nil { - if err := c.createPserver(job); err != nil { - if m == nil { - c.cleanupMaster(namespace, mname) - } - - if t == nil { - c.cleanupTrainer(namespace, tname) - } - return fmt.Errorf("namespace:%v create pserver:%v error:%v", - namespace, pname, err) - } - } - - return nil -} - -// Ensure try to make sure trainer, pserver, master exists. -func (c *TrainingJober) Ensure(job *edlresource.TrainingJob) error { - var err error - for i := 0; i < defaultLoopNum; i++ { - err = c.checkAndCreate(job) - if err == nil { - return nil - } - time.Sleep(defaultDur) - } - - return err -} diff --git a/thirdpartyresource.yaml b/thirdpartyresource.yaml new file mode 100644 index 00000000..08bfacd2 --- /dev/null +++ b/thirdpartyresource.yaml @@ -0,0 +1,7 @@ +apiVersion: extensions/v1beta1 +kind: ThirdPartyResource +metadata: + name: training-job.paddlepaddle.org +description: "PaddlePaddle TrainingJob operator" +versions: +- name: v1