diff --git a/docker/k8s_tools.py b/docker/k8s_tools.py index 16f265e4..626c9e43 100644 --- a/docker/k8s_tools.py +++ b/docker/k8s_tools.py @@ -41,6 +41,13 @@ def fetch_pserver_ips(): pserver_ips = [item[1] for item in pod_list] return ",".join(pserver_ips) +def fetch_master_ip(): + label_selector = "paddle-job-master=%s" % PADDLE_JOB_NAME + pod_list = fetch_pods_info(label_selector) + master_ip = "" + if len(pod_list) >=1: + master_ip = pod_list[0][1] + return master_ip def fetch_trainer_id(): label_selector = "paddle-job=%s" % PADDLE_JOB_NAME @@ -60,5 +67,7 @@ def fetch_trainer_id(): print fetch_pserver_ips() elif command == "fetch_trainer_id": print fetch_trainer_id() + elif command == "fetch_master_ip": + print fetch_master_ip() elif command == "wait_pods_running": wait_pods_running(sys.argv[2], sys.argv[3]) diff --git a/docker/paddle_k8s b/docker/paddle_k8s index 0f08f804..7152f5f2 100755 --- a/docker/paddle_k8s +++ b/docker/paddle_k8s @@ -11,6 +11,21 @@ start_pserver() { --num_gradient_servers=$PADDLE_INIT_NUM_GRADIENT_SERVERS } +start_new_pserver() { + export MASTER_IP=$(python /root/k8s_tools.py fetch_master_ip) + /usr/bin/pserver \ + -port=$PADDLE_INIT_PORT \ + -num-pservers=$PSERVERS \ + -log-level=debug \ + -etcd-endpoint=http://$PADDLE_INIT_MASTER_IP:2379 +} + +start_master() { + /usr/bin/master \ + -port=8080 \ + -endpoints=http://127.0.0.1:2379 +} + check_trainer_ret() { ret=$1 echo "job returned $ret...setting pod return message..." @@ -92,6 +107,12 @@ case "$1" in start_trainer) start_trainer $2 ;; + start_new_pserver) + start_new_pserver + ;; + start_master) + start_master + ;; --help) usage ;; diff --git a/go/paddlecloud/submit.go b/go/paddlecloud/submit.go index c890a2d2..85702356 100644 --- a/go/paddlecloud/submit.go +++ b/go/paddlecloud/submit.go @@ -34,8 +34,12 @@ type SubmitCmd struct { Topology string `json:"topology"` Datacenter string `json:"datacenter"` Passes int `json:"passes"` - Image string `json:"image"` - Registry string `json:"registry"` + // docker image to run jobs + Image string `json:"image"` + Registry string `json:"registry"` + // Alpha features: + // TODO: separate API versions + FaultTolerant bool `json:"faulttolerant"` } // Name is subcommands name. @@ -67,6 +71,7 @@ func (p *SubmitCmd) SetFlags(f *flag.FlagSet) { f.IntVar(&p.Passes, "passes", 1, "Pass count for training job") f.StringVar(&p.Image, "image", "", "Runtime Docker image for the job") f.StringVar(&p.Registry, "registry", "", "Registry secret name for the runtime Docker image") + f.BoolVar(&p.FaultTolerant, "faulttolerant", false, "if true, use new fault-tolerant pservers") } // Execute submit command. diff --git a/paddlecloud/paddlecloud/settings.py b/paddlecloud/paddlecloud/settings.py index 8aae31f6..70c5be81 100644 --- a/paddlecloud/paddlecloud/settings.py +++ b/paddlecloud/paddlecloud/settings.py @@ -319,3 +319,5 @@ # Path store all cuda, nvidia driver libs NVIDIA_LIB_PATH="/usr/local/nvidia/lib64" +# etcd image for fault-tolerant jobs +ETCD_IMAGE="quay.io/coreos/etcd:v3.2.1" diff --git a/paddlecloud/paddlejob/paddle_job.py b/paddlecloud/paddlejob/paddle_job.py index ecdb58c0..b0f713c7 100644 --- a/paddlecloud/paddlejob/paddle_job.py +++ b/paddlecloud/paddlejob/paddle_job.py @@ -3,6 +3,8 @@ import os __all__ = ["PaddleJob"] DEFAULT_PADDLE_PORT=7164 +DEFAULT_MASTER_PORT=8080 +DEFAULT_ETCD_PORT=2379 class PaddleJob(object): """ @@ -24,7 +26,9 @@ def __init__(self, gpu=0, volumes=[], registry_secret=None, - envs = {}): + envs = {}, + new_pserver=True, + etcd_image="quay.io/coreos/etcd:v3.2.1"): self._ports_num=1 self._ports_num_for_sparse=1 @@ -46,6 +50,12 @@ def __init__(self, self._registry_secret = registry_secret self._passes = passes self._usr_envs = envs + # master resources are static + self._mastercpu = 1 + self._mastermemory = "300Mi" + # use new pserver for tolerant + self._new_pserver = new_pserver + self._etcd_image = etcd_image @property def pservers(self): @@ -59,6 +69,9 @@ def parallelism(self): def runtime_image(self): return self._image + def _get_master_name(self): + return "%s-master" % self._name + def _get_pserver_name(self): return "%s-pserver" % self._name @@ -101,11 +114,27 @@ def _get_pserver_container_ports(self): port += 1 return ports + def _get_master_container_ports(self): + ports = [] + port = DEFAULT_MASTER_PORT + ports.append({"containerPort": DEFAULT_MASTER_PORT, "name":"master-port"}) + ports.append({"containerPort": DEFAULT_ETCD_PORT, "name":"etcd-port"}) + return ports + + def _get_master_labels(self): + return {"paddle-job-master": self._name} + def _get_pserver_labels(self): return {"paddle-job-pserver": self._name} + def _get_master_entrypoint(self): + return ["paddle_k8s", "start_master"] + def _get_pserver_entrypoint(self): - return ["paddle_k8s", "start_pserver"] + if not self._new_pserver: + return ["paddle_k8s", "start_pserver"] + else: + return ["paddle_k8s", "start_new_pserver"] def _get_trainer_entrypoint(self): if self._entry: @@ -128,6 +157,58 @@ def _get_trainer_volume_mounts(self): volume_mounts.append(item["volume_mount"]) return volume_mounts + def new_master_job(self): + """ + return: Master ReplicaSet + """ + rs = { + "apiVersion": "extensions/v1beta1", + "kind": "ReplicaSet", + "metadata":{ + "name": self._get_master_name(), + }, + "spec":{ + "replicas": 1, # NOTE: always 1 replica of master + "template": { + "metadata": { + "labels": self._get_master_labels() + }, + "spec": { + # mount trainer volumes to dispatch datasets + "volumes": self._get_trainer_volumes(), + "containers":[{ + "name": self._name, + "image": self._image, + "ports": self._get_master_container_ports(), + "env": self.get_env(), + "volumeMounts": self._get_trainer_volume_mounts(), + "command": self._get_master_entrypoint(), + "resources": { + "requests": { + "memory": str(self._mastermemory), + "cpu": str(self._mastercpu) + }, + "limits": { + "memory": str(self._mastermemory), + "cpu": str(self._mastercpu) + } + } + }, { + "name": self._name + "-etcd", + "image": self._etcd_image, + "command": ["etcd", "-name", "etcd0", "-advertise-client-urls", "http://$(POD_IP):2379,http://$(POD_IP):4001", "-listen-client-urls", "http://0.0.0.0:2379,http://0.0.0.0:4001", "-initial-advertise-peer-urls", "http://$(POD_IP):2380", "-listen-peer-urls", "http://0.0.0.0:2380", "-initial-cluster", "etcd0=http://$(POD_IP):2380", "-initial-cluster-state", "new"], + "env": [{ + "name": "POD_IP", + "valueFrom": {"fieldRef": {"fieldPath": "status.podIP"}} + }] + + }] + } + } + } + } + return rs + def new_trainer_job(self): """ return: Trainer job, it's a Kubernetes Job diff --git a/paddlecloud/paddlejob/views.py b/paddlecloud/paddlejob/views.py index 70413757..cb79a953 100644 --- a/paddlecloud/paddlejob/views.py +++ b/paddlecloud/paddlejob/views.py @@ -39,6 +39,7 @@ def post(self, request, format=None): obj = json.loads(request.body) topology = obj.get("topology", "") entry = obj.get("entry", "") + fault_tolerant = obj.get("faulttolerant", False) api_client = notebook.utils.get_user_api_client(username) if not topology and not entry: return utils.simple_response(500, "no topology or entry specified") @@ -127,7 +128,7 @@ def post(self, request, format=None): )) envs = {} envs.update({"PADDLE_CLOUD_CURRENT_DATACENTER": dc}) - + # ===================== create PaddleJob instance ====================== paddle_job = PaddleJob( name = job_name, job_package = package_in_pod, @@ -144,26 +145,39 @@ def post(self, request, format=None): passes = obj.get("passes", 1), registry_secret = registry_secret, volumes = volumes, - envs = envs + envs = envs, + new_pserver = fault_tolerant, + etcd_image = settings.ETCD_IMAGE ) + # ========== submit master ReplicaSet if using fault_tolerant feature == + # FIXME: alpha features in separate module + if fault_tolerant: + try: + ret = client.ExtensionsV1beta1Api(api_client=api_client).create_namespaced_replica_set( + namespace, + paddle_job.new_master_job()) + except ApiException, e: + logging.error("error submitting master job: %s", e) + return utils.simple_response(500, str(e)) + # ========================= submit pserver job ========================= try: ret = client.ExtensionsV1beta1Api(api_client=api_client).create_namespaced_replica_set( namespace, - paddle_job.new_pserver_job(), - pretty=True) + paddle_job.new_pserver_job()) except ApiException, e: - logging.error("error submitting pserver job: %s " % e) + logging.error("error submitting pserver job: %s ", e) return utils.simple_response(500, str(e)) - - #submit trainer job, it's Kubernetes Job + # ========================= submit trainer job ========================= try: ret = client.BatchV1Api(api_client=api_client).create_namespaced_job( namespace, - paddle_job.new_trainer_job(), - pretty=True) + paddle_job.new_trainer_job()) except ApiException, e: logging.error("error submitting trainer job: %s" % e) return utils.simple_response(500, str(e)) + + # TODO(typhoonzero): stop master and pservers when job finish or fails + return utils.simple_response(200, "") def delete(self, request, format=None): @@ -222,6 +236,29 @@ def delete(self, request, format=None): except ApiException, e: logging.error("error deleting pserver pods: %s" % str(e)) delete_status.append(str(e)) + + # delete master rs + master_name = jobname + "-master" + try: + u_status = client.ExtensionsV1beta1Api(api_client=api_client)\ + .delete_namespaced_replica_set(master_name, namespace, {}) + except ApiException, e: + logging.error("error deleting master: %s" % str(e)) + delete_status.append(str(e)) + + # delete master pods + try: + # master replica set has label with jobname + job_pod_list = client.CoreV1Api(api_client=api_client)\ + .list_namespaced_pod(namespace, + label_selector="paddle-job-master=%s"%jobname) + for i in job_pod_list.items: + u_status = client.CoreV1Api(api_client=api_client)\ + .delete_namespaced_pod(i.metadata.name, namespace, {}) + except ApiException, e: + logging.error("error deleting master pods: %s" % str(e)) + delete_status.append(str(e)) + if len(delete_status) > 0: retcode = 500 else: