Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Done]Fault tolerant job #212

Merged
merged 7 commits into from
Jul 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docker/k8s_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ def fetch_pserver_ips():
pserver_ips = [item[1] for item in pod_list]
return ",".join(pserver_ips)

def fetch_master_ip():
label_selector = "paddle-job-master=%s" % PADDLE_JOB_NAME
pod_list = fetch_pods_info(label_selector)
master_ip = ""
if len(pod_list) >=1:
master_ip = pod_list[0][1]
return master_ip

def fetch_trainer_id():
label_selector = "paddle-job=%s" % PADDLE_JOB_NAME
Expand All @@ -60,5 +67,7 @@ def fetch_trainer_id():
print fetch_pserver_ips()
elif command == "fetch_trainer_id":
print fetch_trainer_id()
elif command == "fetch_master_ip":
print fetch_master_ip()
elif command == "wait_pods_running":
wait_pods_running(sys.argv[2], sys.argv[3])
21 changes: 21 additions & 0 deletions docker/paddle_k8s
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,21 @@ start_pserver() {
--num_gradient_servers=$PADDLE_INIT_NUM_GRADIENT_SERVERS
}

start_new_pserver() {
export MASTER_IP=$(python /root/k8s_tools.py fetch_master_ip)
/usr/bin/pserver \
-port=$PADDLE_INIT_PORT \
-num-pservers=$PSERVERS \
-log-level=debug \
-etcd-endpoint=http://$PADDLE_INIT_MASTER_IP:2379
}

start_master() {
/usr/bin/master \
-port=8080 \
-endpoints=http://127.0.0.1:2379
}

check_trainer_ret() {
ret=$1
echo "job returned $ret...setting pod return message..."
Expand Down Expand Up @@ -92,6 +107,12 @@ case "$1" in
start_trainer)
start_trainer $2
;;
start_new_pserver)
start_new_pserver
;;
start_master)
start_master
;;
--help)
usage
;;
Expand Down
9 changes: 7 additions & 2 deletions go/paddlecloud/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@ type SubmitCmd struct {
Topology string `json:"topology"`
Datacenter string `json:"datacenter"`
Passes int `json:"passes"`
Image string `json:"image"`
Registry string `json:"registry"`
// docker image to run jobs
Image string `json:"image"`
Registry string `json:"registry"`
// Alpha features:
// TODO: separate API versions
FaultTolerant bool `json:"faulttolerant"`
}

// Name is subcommands name.
Expand Down Expand Up @@ -67,6 +71,7 @@ func (p *SubmitCmd) SetFlags(f *flag.FlagSet) {
f.IntVar(&p.Passes, "passes", 1, "Pass count for training job")
f.StringVar(&p.Image, "image", "", "Runtime Docker image for the job")
f.StringVar(&p.Registry, "registry", "", "Registry secret name for the runtime Docker image")
f.BoolVar(&p.FaultTolerant, "faulttolerant", false, "if true, use new fault-tolerant pservers")
}

// Execute submit command.
Expand Down
2 changes: 2 additions & 0 deletions paddlecloud/paddlecloud/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,3 +319,5 @@

# Path store all cuda, nvidia driver libs
NVIDIA_LIB_PATH="/usr/local/nvidia/lib64"
# etcd image for fault-tolerant jobs
ETCD_IMAGE="quay.io/coreos/etcd:v3.2.1"
85 changes: 83 additions & 2 deletions paddlecloud/paddlejob/paddle_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import os
__all__ = ["PaddleJob"]
DEFAULT_PADDLE_PORT=7164
DEFAULT_MASTER_PORT=8080
DEFAULT_ETCD_PORT=2379

class PaddleJob(object):
"""
Expand All @@ -24,7 +26,9 @@ def __init__(self,
gpu=0,
volumes=[],
registry_secret=None,
envs = {}):
envs = {},
new_pserver=True,
etcd_image="quay.io/coreos/etcd:v3.2.1"):

self._ports_num=1
self._ports_num_for_sparse=1
Expand All @@ -46,6 +50,12 @@ def __init__(self,
self._registry_secret = registry_secret
self._passes = passes
self._usr_envs = envs
# master resources are static
self._mastercpu = 1
self._mastermemory = "300Mi"
# use new pserver for tolerant
self._new_pserver = new_pserver
self._etcd_image = etcd_image

@property
def pservers(self):
Expand All @@ -59,6 +69,9 @@ def parallelism(self):
def runtime_image(self):
return self._image

def _get_master_name(self):
return "%s-master" % self._name

def _get_pserver_name(self):
return "%s-pserver" % self._name

Expand Down Expand Up @@ -101,11 +114,27 @@ def _get_pserver_container_ports(self):
port += 1
return ports

def _get_master_container_ports(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the master port will export the etcd port and master port?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

ports = []
port = DEFAULT_MASTER_PORT
ports.append({"containerPort": DEFAULT_MASTER_PORT, "name":"master-port"})
ports.append({"containerPort": DEFAULT_ETCD_PORT, "name":"etcd-port"})
return ports

def _get_master_labels(self):
return {"paddle-job-master": self._name}

def _get_pserver_labels(self):
return {"paddle-job-pserver": self._name}

def _get_master_entrypoint(self):
return ["paddle_k8s", "start_master"]

def _get_pserver_entrypoint(self):
return ["paddle_k8s", "start_pserver"]
if not self._new_pserver:
return ["paddle_k8s", "start_pserver"]
else:
return ["paddle_k8s", "start_new_pserver"]

def _get_trainer_entrypoint(self):
if self._entry:
Expand All @@ -128,6 +157,58 @@ def _get_trainer_volume_mounts(self):
volume_mounts.append(item["volume_mount"])
return volume_mounts

def new_master_job(self):
"""
return: Master ReplicaSet
"""
rs = {
"apiVersion": "extensions/v1beta1",
"kind": "ReplicaSet",
"metadata":{
"name": self._get_master_name(),
},
"spec":{
"replicas": 1, # NOTE: always 1 replica of master
"template": {
"metadata": {
"labels": self._get_master_labels()
},
"spec": {
# mount trainer volumes to dispatch datasets
"volumes": self._get_trainer_volumes(),
"containers":[{
"name": self._name,
"image": self._image,
"ports": self._get_master_container_ports(),
"env": self.get_env(),
"volumeMounts": self._get_trainer_volume_mounts(),
"command": self._get_master_entrypoint(),
"resources": {
"requests": {
"memory": str(self._mastermemory),
"cpu": str(self._mastercpu)
},
"limits": {
"memory": str(self._mastermemory),
"cpu": str(self._mastercpu)
}
}
}, {
"name": self._name + "-etcd",
"image": self._etcd_image,
"command": ["etcd", "-name", "etcd0", "-advertise-client-urls", "http://$(POD_IP):2379,http://$(POD_IP):4001", "-listen-client-urls", "http://0.0.0.0:2379,http://0.0.0.0:4001", "-initial-advertise-peer-urls", "http://$(POD_IP):2380", "-listen-peer-urls", "http://0.0.0.0:2380", "-initial-cluster", "etcd0=http://$(POD_IP):2380", "-initial-cluster-state", "new"],
"env": [{
"name": "POD_IP",
"valueFrom": {"fieldRef": {"fieldPath": "status.podIP"}}
}]

}]
}
}
}
}
return rs

def new_trainer_job(self):
"""
return: Trainer job, it's a Kubernetes Job
Expand Down
55 changes: 46 additions & 9 deletions paddlecloud/paddlejob/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def post(self, request, format=None):
obj = json.loads(request.body)
topology = obj.get("topology", "")
entry = obj.get("entry", "")
fault_tolerant = obj.get("faulttolerant", False)
api_client = notebook.utils.get_user_api_client(username)
if not topology and not entry:
return utils.simple_response(500, "no topology or entry specified")
Expand Down Expand Up @@ -127,7 +128,7 @@ def post(self, request, format=None):
))
envs = {}
envs.update({"PADDLE_CLOUD_CURRENT_DATACENTER": dc})

# ===================== create PaddleJob instance ======================
paddle_job = PaddleJob(
name = job_name,
job_package = package_in_pod,
Expand All @@ -144,26 +145,39 @@ def post(self, request, format=None):
passes = obj.get("passes", 1),
registry_secret = registry_secret,
volumes = volumes,
envs = envs
envs = envs,
new_pserver = fault_tolerant,
etcd_image = settings.ETCD_IMAGE
)
# ========== submit master ReplicaSet if using fault_tolerant feature ==
# FIXME: alpha features in separate module
if fault_tolerant:
try:
ret = client.ExtensionsV1beta1Api(api_client=api_client).create_namespaced_replica_set(
namespace,
paddle_job.new_master_job())
except ApiException, e:
logging.error("error submitting master job: %s", e)
return utils.simple_response(500, str(e))
# ========================= submit pserver job =========================
try:
ret = client.ExtensionsV1beta1Api(api_client=api_client).create_namespaced_replica_set(
namespace,
paddle_job.new_pserver_job(),
pretty=True)
paddle_job.new_pserver_job())
except ApiException, e:
logging.error("error submitting pserver job: %s " % e)
logging.error("error submitting pserver job: %s ", e)
return utils.simple_response(500, str(e))

#submit trainer job, it's Kubernetes Job
# ========================= submit trainer job =========================
try:
ret = client.BatchV1Api(api_client=api_client).create_namespaced_job(
namespace,
paddle_job.new_trainer_job(),
pretty=True)
paddle_job.new_trainer_job())
except ApiException, e:
logging.error("error submitting trainer job: %s" % e)
return utils.simple_response(500, str(e))

# TODO(typhoonzero): stop master and pservers when job finish or fails

return utils.simple_response(200, "")

def delete(self, request, format=None):
Expand Down Expand Up @@ -222,6 +236,29 @@ def delete(self, request, format=None):
except ApiException, e:
logging.error("error deleting pserver pods: %s" % str(e))
delete_status.append(str(e))

# delete master rs
master_name = jobname + "-master"
try:
u_status = client.ExtensionsV1beta1Api(api_client=api_client)\
.delete_namespaced_replica_set(master_name, namespace, {})
except ApiException, e:
logging.error("error deleting master: %s" % str(e))
delete_status.append(str(e))

# delete master pods
try:
# master replica set has label with jobname
job_pod_list = client.CoreV1Api(api_client=api_client)\
.list_namespaced_pod(namespace,
label_selector="paddle-job-master=%s"%jobname)
for i in job_pod_list.items:
u_status = client.CoreV1Api(api_client=api_client)\
.delete_namespaced_pod(i.metadata.name, namespace, {})
except ApiException, e:
logging.error("error deleting master pods: %s" % str(e))
delete_status.append(str(e))

if len(delete_status) > 0:
retcode = 500
else:
Expand Down