From 8a973edfdba9caf220b137ac276cd71621f212a8 Mon Sep 17 00:00:00 2001 From: wuyi05 Date: Thu, 13 Jul 2017 10:15:54 +0800 Subject: [PATCH 1/5] fault tolerant job --- docker/k8s_tools.py | 9 +++++++++ docker/paddle_k8s | 21 +++++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/docker/k8s_tools.py b/docker/k8s_tools.py index 16f265e4..626c9e43 100644 --- a/docker/k8s_tools.py +++ b/docker/k8s_tools.py @@ -41,6 +41,13 @@ def fetch_pserver_ips(): pserver_ips = [item[1] for item in pod_list] return ",".join(pserver_ips) +def fetch_master_ip(): + label_selector = "paddle-job-master=%s" % PADDLE_JOB_NAME + pod_list = fetch_pods_info(label_selector) + master_ip = "" + if len(pod_list) >=1: + master_ip = pod_list[0][1] + return master_ip def fetch_trainer_id(): label_selector = "paddle-job=%s" % PADDLE_JOB_NAME @@ -60,5 +67,7 @@ def fetch_trainer_id(): print fetch_pserver_ips() elif command == "fetch_trainer_id": print fetch_trainer_id() + elif command == "fetch_master_ip": + print fetch_master_ip() elif command == "wait_pods_running": wait_pods_running(sys.argv[2], sys.argv[3]) diff --git a/docker/paddle_k8s b/docker/paddle_k8s index 379f3437..436067ae 100755 --- a/docker/paddle_k8s +++ b/docker/paddle_k8s @@ -11,6 +11,21 @@ start_pserver() { --num_gradient_servers=$PADDLE_INIT_NUM_GRADIENT_SERVERS } +start_new_pserver() { + export MASTER_IP=$(python /root/k8s_tools.py fetch_master_ip) + /usr/bin/pserver \ + -port=$PADDLE_INIT_PORT \ + -num-pservers=$PSERVERS \ + -log-level=debug \ + -etcd-endpoint=http://$PADDLE_INIT_MASTER_IP:2379 +} + +start_master() { + /usr/bin/master \ + -port=8080 \ + -endpoints=http://127.0.0.1:2379 +} + check_trainer_ret() { ret=$1 echo "job returned $ret...setting pod return message..." @@ -88,6 +103,12 @@ case "$1" in start_trainer) start_trainer $2 ;; + start_new_pserver) + start_new_pserver + ;; + start_master) + start_master + ;; --help) usage ;; From a51d12e276ddcaa684a55cdc235cb2dc2f35c158 Mon Sep 17 00:00:00 2001 From: wuyi05 Date: Mon, 17 Jul 2017 14:16:58 +0800 Subject: [PATCH 2/5] ft job --- paddlecloud/paddlejob/paddle_job.py | 74 ++++++++++++++++++++++++++++- 1 file changed, 72 insertions(+), 2 deletions(-) diff --git a/paddlecloud/paddlejob/paddle_job.py b/paddlecloud/paddlejob/paddle_job.py index ecdb58c0..2db6b4c2 100644 --- a/paddlecloud/paddlejob/paddle_job.py +++ b/paddlecloud/paddlejob/paddle_job.py @@ -3,6 +3,7 @@ import os __all__ = ["PaddleJob"] DEFAULT_PADDLE_PORT=7164 +DEFAULT_MASTER_PORT=8080 class PaddleJob(object): """ @@ -24,7 +25,8 @@ def __init__(self, gpu=0, volumes=[], registry_secret=None, - envs = {}): + envs = {}, + new_pserver=True): self._ports_num=1 self._ports_num_for_sparse=1 @@ -46,6 +48,11 @@ def __init__(self, self._registry_secret = registry_secret self._passes = passes self._usr_envs = envs + # master resources are static + self._mastercpu = 1 + self._mastermemory = "300Mi" + # use new pserver for tolerant + self._new_pserver = new_pserver @property def pservers(self): @@ -59,6 +66,9 @@ def parallelism(self): def runtime_image(self): return self._image + def _get_master_name(self): + return "%s-master" % self._name + def _get_pserver_name(self): return "%s-pserver" % self._name @@ -101,11 +111,28 @@ def _get_pserver_container_ports(self): port += 1 return ports + def _get_master_container_ports(self): + ports = [] + port = DEFAULT_MASTER_PORT + for i in xrange(self._ports_num + self._ports_num_for_sparse): + ports.append({"containerPort":port, "name":"jobport-%d" % i}) + port += 1 + return ports + + def _get_master_labels(self): + return {"paddle-job-master": self._name} + def _get_pserver_labels(self): return {"paddle-job-pserver": self._name} + def _get_master_entrypoint(self): + return ["paddle_k8s", "start_master"] + def _get_pserver_entrypoint(self): - return ["paddle_k8s", "start_pserver"] + if self._new_pserver: + return ["paddle_k8s", "start_pserver"] + else: + return ["paddle_k8s", "start_new_pserver"] def _get_trainer_entrypoint(self): if self._entry: @@ -128,6 +155,49 @@ def _get_trainer_volume_mounts(self): volume_mounts.append(item["volume_mount"]) return volume_mounts + def new_master_job(self): + """ + return: Master ReplicaSet + """ + rs = { + "apiVersion": "extensions/v1beta1", + "kind": "ReplicaSet", + "metadata":{ + "name": self._get_master_name(), + }, + "spec":{ + "replicas": 1, # NOTE: always 1 replica of master + "template": { + "metadata": { + "labels": self._get_master_labels() + }, + "spec": { + # mount trainer volumes to dispatch datasets + "volumes": self._get_trainer_volumes(), + "containers":[{ + "name": self._name, + "image": self._image, + "ports": self._get_master_container_ports(), + "env": self.get_env(), + "volumeMounts": self._get_trainer_volume_mounts(), + "command": self._get_master_entrypoint(), + "resources": { + "requests": { + "memory": str(self._mastermemory), + "cpu": str(self._mastercpu) + }, + "limits": { + "memory": str(self._mastermemory), + "cpu": str(self._mastercpu) + } + } + }] + } + } + } + } + return rs + def new_trainer_job(self): """ return: Trainer job, it's a Kubernetes Job From 41bc0f9f4dbada78af227bb6f14b5ba89689387a Mon Sep 17 00:00:00 2001 From: wuyi05 Date: Wed, 19 Jul 2017 11:26:54 +0800 Subject: [PATCH 3/5] update --- go/paddlecloud/submit.go | 9 +++++++-- paddlecloud/paddlecloud/settings.py | 2 ++ paddlecloud/paddlejob/paddle_job.py | 13 ++++++++++++- paddlecloud/paddlejob/views.py | 30 +++++++++++++++++++++-------- 4 files changed, 43 insertions(+), 11 deletions(-) diff --git a/go/paddlecloud/submit.go b/go/paddlecloud/submit.go index fa62cf83..5b85c379 100644 --- a/go/paddlecloud/submit.go +++ b/go/paddlecloud/submit.go @@ -33,8 +33,12 @@ type SubmitCmd struct { Topology string `json:"topology"` Datacenter string `json:"datacenter"` Passes int `json:"passes"` - Image string `json:"image"` - Registry string `json:"registry"` + // docker image to run jobs + Image string `json:"image"` + Registry string `json:"registry"` + // Alpha features: + // TODO: separate API versions + FaultTolerant bool `json:"faulttolerant"` } // Name is subcommands name. @@ -66,6 +70,7 @@ func (p *SubmitCmd) SetFlags(f *flag.FlagSet) { f.IntVar(&p.Passes, "passes", 1, "Pass count for training job") f.StringVar(&p.Image, "image", "", "Runtime Docker image for the job") f.StringVar(&p.Registry, "registry", "", "Registry secret name for the runtime Docker image") + f.BoolVar(&p.FaultTolerant, "faulttolerant", false, "if true, use new fault-tolerant pservers") } // Execute submit command. diff --git a/paddlecloud/paddlecloud/settings.py b/paddlecloud/paddlecloud/settings.py index bf392fd9..9dc52300 100644 --- a/paddlecloud/paddlecloud/settings.py +++ b/paddlecloud/paddlecloud/settings.py @@ -317,3 +317,5 @@ # Path store all cuda, nvidia driver libs NVIDIA_LIB_PATH="/usr/local/nvidia/lib64" +# etcd image for fault-tolerant jobs +ETCD_IMAGE="quay.io/coreos/etcd:v3.2.1" diff --git a/paddlecloud/paddlejob/paddle_job.py b/paddlecloud/paddlejob/paddle_job.py index 2db6b4c2..775e9bc5 100644 --- a/paddlecloud/paddlejob/paddle_job.py +++ b/paddlecloud/paddlejob/paddle_job.py @@ -26,7 +26,8 @@ def __init__(self, volumes=[], registry_secret=None, envs = {}, - new_pserver=True): + new_pserver=True, + etcd_image="quay.io/coreos/etcd:v3.2.1"): self._ports_num=1 self._ports_num_for_sparse=1 @@ -53,6 +54,7 @@ def __init__(self, self._mastermemory = "300Mi" # use new pserver for tolerant self._new_pserver = new_pserver + self._etcd_image = etcd_image @property def pservers(self): @@ -191,6 +193,15 @@ def new_master_job(self): "cpu": str(self._mastercpu) } } + }, { + "name": self._name + "-etcd", + "image": self._etcd_image, + "command": ["etcd", "-name", "etcd0", "-advertise-client-urls", "http://$(POD_IP):2379,http://$(POD_IP):4001", "-listen-client-urls", "http://0.0.0.0:2379,http://0.0.0.0:4001", "-init ial-advertise-peer-urls", "http://$(POD_IP):2380", "-listen-peer-urls", "http://0.0.0.0:2380", "-initial-cluster", "etcd0=http://$(POD_IP):2380", "-initial-cluster-state", "new"], + "env": [{ + "name": "POD_IP", + "valueFrom": {"fieldRef": {"fieldPath": "status.podIP"}} + }] + }] } } diff --git a/paddlecloud/paddlejob/views.py b/paddlecloud/paddlejob/views.py index 70413757..40e0b67e 100644 --- a/paddlecloud/paddlejob/views.py +++ b/paddlecloud/paddlejob/views.py @@ -39,6 +39,7 @@ def post(self, request, format=None): obj = json.loads(request.body) topology = obj.get("topology", "") entry = obj.get("entry", "") + fault_tolerant = obj.get("faulttolerant", False) api_client = notebook.utils.get_user_api_client(username) if not topology and not entry: return utils.simple_response(500, "no topology or entry specified") @@ -127,7 +128,7 @@ def post(self, request, format=None): )) envs = {} envs.update({"PADDLE_CLOUD_CURRENT_DATACENTER": dc}) - + # ===================== create PaddleJob instance ====================== paddle_job = PaddleJob( name = job_name, job_package = package_in_pod, @@ -145,25 +146,38 @@ def post(self, request, format=None): registry_secret = registry_secret, volumes = volumes, envs = envs + new_pserver= + etcd_image=settings.ETCD_IMAGE ) + # ========== submit master ReplicaSet if using fault_tolerant feature == + # FIXME: alpha features in separate module + if fault_tolerant: + try: + ret = client.ExtensionsV1beta1Api(api_client=api_client).create_namespaced_replica_set( + namespace, + paddle_job.new_master_job()) + except ApiException, e: + logging.error("error submitting master job: %s", e) + return utils.simple_response(500, str(e)) + # ========================= submit pserver job ========================= try: ret = client.ExtensionsV1beta1Api(api_client=api_client).create_namespaced_replica_set( namespace, - paddle_job.new_pserver_job(), - pretty=True) + paddle_job.new_pserver_job()) except ApiException, e: - logging.error("error submitting pserver job: %s " % e) + logging.error("error submitting pserver job: %s ", e) return utils.simple_response(500, str(e)) - - #submit trainer job, it's Kubernetes Job + # ========================= submit trainer job ========================= try: ret = client.BatchV1Api(api_client=api_client).create_namespaced_job( namespace, - paddle_job.new_trainer_job(), - pretty=True) + paddle_job.new_trainer_job()) except ApiException, e: logging.error("error submitting trainer job: %s" % e) return utils.simple_response(500, str(e)) + + # TODO(typhoonzero): stop master and pservers when job finish or fails + return utils.simple_response(200, "") def delete(self, request, format=None): From be2cdf4f731d12f7b5e2e1349410e3335562135b Mon Sep 17 00:00:00 2001 From: wuyi05 Date: Wed, 19 Jul 2017 17:01:31 +0800 Subject: [PATCH 4/5] tested job submit with ft job --- go/paddlecloud/submit.go | 5 +++++ paddlecloud/paddlejob/paddle_job.py | 4 ++-- paddlecloud/paddlejob/views.py | 29 ++++++++++++++++++++++++++--- 3 files changed, 33 insertions(+), 5 deletions(-) diff --git a/go/paddlecloud/submit.go b/go/paddlecloud/submit.go index 85702356..2b661239 100644 --- a/go/paddlecloud/submit.go +++ b/go/paddlecloud/submit.go @@ -112,6 +112,7 @@ func NewSubmitter(cmd *SubmitCmd) *Submitter { func (s *Submitter) Submit(jobPackage string, jobName string) error { // if jobPackage is not a local dir, skip uploading package. _, pkgerr := os.Stat(jobPackage) + fmt.Println("1") if pkgerr == nil { dest := path.Join("/pfs", Config.ActiveConfig.Name, "home", Config.ActiveConfig.Username, "jobs", jobName) if !strings.HasSuffix(jobPackage, "/") { @@ -124,20 +125,24 @@ func (s *Submitter) Submit(jobPackage string, jobName string) error { } else if os.IsNotExist(pkgerr) { glog.Warning("jobpackage not a local dir, skip upload.") } + fmt.Println("2") // 2. call paddlecloud server to create kubernetes job jsonString, err := json.Marshal(s.args) if err != nil { return err } + fmt.Println("3") glog.V(10).Infof("Submitting job: %s to %s\n", jsonString, Config.ActiveConfig.Endpoint+"/api/v1/jobs") respBody, err := restclient.PostCall(Config.ActiveConfig.Endpoint+"/api/v1/jobs/", jsonString) if err != nil { return err } + fmt.Println("4") var respObj interface{} if err = json.Unmarshal(respBody, &respObj); err != nil { return err } + fmt.Println("5") // FIXME: Return an error if error message is not empty. Use response code instead errMsg := respObj.(map[string]interface{})["msg"].(string) if len(errMsg) > 0 { diff --git a/paddlecloud/paddlejob/paddle_job.py b/paddlecloud/paddlejob/paddle_job.py index 775e9bc5..b082db0c 100644 --- a/paddlecloud/paddlejob/paddle_job.py +++ b/paddlecloud/paddlejob/paddle_job.py @@ -131,7 +131,7 @@ def _get_master_entrypoint(self): return ["paddle_k8s", "start_master"] def _get_pserver_entrypoint(self): - if self._new_pserver: + if not self._new_pserver: return ["paddle_k8s", "start_pserver"] else: return ["paddle_k8s", "start_new_pserver"] @@ -196,7 +196,7 @@ def new_master_job(self): }, { "name": self._name + "-etcd", "image": self._etcd_image, - "command": ["etcd", "-name", "etcd0", "-advertise-client-urls", "http://$(POD_IP):2379,http://$(POD_IP):4001", "-listen-client-urls", "http://0.0.0.0:2379,http://0.0.0.0:4001", "-init ial-advertise-peer-urls", "http://$(POD_IP):2380", "-listen-peer-urls", "http://0.0.0.0:2380", "-initial-cluster", "etcd0=http://$(POD_IP):2380", "-initial-cluster-state", "new"], + "command": ["etcd", "-name", "etcd0", "-advertise-client-urls", "http://$(POD_IP):2379,http://$(POD_IP):4001", "-listen-client-urls", "http://0.0.0.0:2379,http://0.0.0.0:4001", "-initial-advertise-peer-urls", "http://$(POD_IP):2380", "-listen-peer-urls", "http://0.0.0.0:2380", "-initial-cluster", "etcd0=http://$(POD_IP):2380", "-initial-cluster-state", "new"], "env": [{ "name": "POD_IP", "valueFrom": {"fieldRef": {"fieldPath": "status.podIP"}} diff --git a/paddlecloud/paddlejob/views.py b/paddlecloud/paddlejob/views.py index 40e0b67e..cb79a953 100644 --- a/paddlecloud/paddlejob/views.py +++ b/paddlecloud/paddlejob/views.py @@ -145,9 +145,9 @@ def post(self, request, format=None): passes = obj.get("passes", 1), registry_secret = registry_secret, volumes = volumes, - envs = envs - new_pserver= - etcd_image=settings.ETCD_IMAGE + envs = envs, + new_pserver = fault_tolerant, + etcd_image = settings.ETCD_IMAGE ) # ========== submit master ReplicaSet if using fault_tolerant feature == # FIXME: alpha features in separate module @@ -236,6 +236,29 @@ def delete(self, request, format=None): except ApiException, e: logging.error("error deleting pserver pods: %s" % str(e)) delete_status.append(str(e)) + + # delete master rs + master_name = jobname + "-master" + try: + u_status = client.ExtensionsV1beta1Api(api_client=api_client)\ + .delete_namespaced_replica_set(master_name, namespace, {}) + except ApiException, e: + logging.error("error deleting master: %s" % str(e)) + delete_status.append(str(e)) + + # delete master pods + try: + # master replica set has label with jobname + job_pod_list = client.CoreV1Api(api_client=api_client)\ + .list_namespaced_pod(namespace, + label_selector="paddle-job-master=%s"%jobname) + for i in job_pod_list.items: + u_status = client.CoreV1Api(api_client=api_client)\ + .delete_namespaced_pod(i.metadata.name, namespace, {}) + except ApiException, e: + logging.error("error deleting master pods: %s" % str(e)) + delete_status.append(str(e)) + if len(delete_status) > 0: retcode = 500 else: From bd503c2361e2e0db3866901236a0dbbda9e27f63 Mon Sep 17 00:00:00 2001 From: wuyi05 Date: Thu, 20 Jul 2017 10:00:21 +0800 Subject: [PATCH 5/5] update --- go/paddlecloud/submit.go | 5 ----- paddlecloud/paddlejob/paddle_job.py | 6 +++--- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/go/paddlecloud/submit.go b/go/paddlecloud/submit.go index 2b661239..85702356 100644 --- a/go/paddlecloud/submit.go +++ b/go/paddlecloud/submit.go @@ -112,7 +112,6 @@ func NewSubmitter(cmd *SubmitCmd) *Submitter { func (s *Submitter) Submit(jobPackage string, jobName string) error { // if jobPackage is not a local dir, skip uploading package. _, pkgerr := os.Stat(jobPackage) - fmt.Println("1") if pkgerr == nil { dest := path.Join("/pfs", Config.ActiveConfig.Name, "home", Config.ActiveConfig.Username, "jobs", jobName) if !strings.HasSuffix(jobPackage, "/") { @@ -125,24 +124,20 @@ func (s *Submitter) Submit(jobPackage string, jobName string) error { } else if os.IsNotExist(pkgerr) { glog.Warning("jobpackage not a local dir, skip upload.") } - fmt.Println("2") // 2. call paddlecloud server to create kubernetes job jsonString, err := json.Marshal(s.args) if err != nil { return err } - fmt.Println("3") glog.V(10).Infof("Submitting job: %s to %s\n", jsonString, Config.ActiveConfig.Endpoint+"/api/v1/jobs") respBody, err := restclient.PostCall(Config.ActiveConfig.Endpoint+"/api/v1/jobs/", jsonString) if err != nil { return err } - fmt.Println("4") var respObj interface{} if err = json.Unmarshal(respBody, &respObj); err != nil { return err } - fmt.Println("5") // FIXME: Return an error if error message is not empty. Use response code instead errMsg := respObj.(map[string]interface{})["msg"].(string) if len(errMsg) > 0 { diff --git a/paddlecloud/paddlejob/paddle_job.py b/paddlecloud/paddlejob/paddle_job.py index b082db0c..b0f713c7 100644 --- a/paddlecloud/paddlejob/paddle_job.py +++ b/paddlecloud/paddlejob/paddle_job.py @@ -4,6 +4,7 @@ __all__ = ["PaddleJob"] DEFAULT_PADDLE_PORT=7164 DEFAULT_MASTER_PORT=8080 +DEFAULT_ETCD_PORT=2379 class PaddleJob(object): """ @@ -116,9 +117,8 @@ def _get_pserver_container_ports(self): def _get_master_container_ports(self): ports = [] port = DEFAULT_MASTER_PORT - for i in xrange(self._ports_num + self._ports_num_for_sparse): - ports.append({"containerPort":port, "name":"jobport-%d" % i}) - port += 1 + ports.append({"containerPort": DEFAULT_MASTER_PORT, "name":"master-port"}) + ports.append({"containerPort": DEFAULT_ETCD_PORT, "name":"etcd-port"}) return ports def _get_master_labels(self):