From 2f59037666bd2dd89fbba1f1dcad97c8be712652 Mon Sep 17 00:00:00 2001 From: wuyi05 Date: Tue, 4 Jul 2017 18:06:09 +0800 Subject: [PATCH 1/2] prettify output --- docker/paddle_k8s | 21 ++++++++++- go/paddlecloud/get.go | 65 ++++++++++++++++++++++++++++----- paddlecloud/paddlecloud/urls.py | 1 + paddlecloud/paddlejob/views.py | 13 +++++++ 4 files changed, 90 insertions(+), 10 deletions(-) diff --git a/docker/paddle_k8s b/docker/paddle_k8s index d910025d..379f3437 100755 --- a/docker/paddle_k8s +++ b/docker/paddle_k8s @@ -11,6 +11,23 @@ start_pserver() { --num_gradient_servers=$PADDLE_INIT_NUM_GRADIENT_SERVERS } +check_trainer_ret() { + ret=$1 + echo "job returned $ret...setting pod return message..." + echo "===============================" + if [ $ret -eq 136 ] ; then + echo "Error Arithmetic Operation(Floating Point Exception)" > /dev/termination-log + elif [ $ret -eq 139 ] ; then + echo "Segmentation Fault" > /dev/termination-log + elif [ $ret -eq 1 ] ; then + echo "General Error" > /dev/termination-log + elif [ $ret -eq 134 ] ; then + echo "Program Abort" > /dev/termination-log + fi + echo "termination log wroted..." + exit $ret +} + start_trainer() { python /root/k8s_tools.py wait_pods_running paddle-job-pserver=${PADDLE_JOB_NAME} ${PSERVERS} python /root/k8s_tools.py wait_pods_running paddle-job=${PADDLE_JOB_NAME} ${TRAINERS} @@ -47,9 +64,11 @@ start_trainer() { --save_dir=$OUTPUT \ --pservers=$PADDLE_INIT_PSERVERS \ --num_gradient_servers=$PADDLE_INIT_NUM_GRADIENT_SERVERS + check_trainer_ret $? ;; "v2") - ${ENTRY} + sh -c "${ENTRY}" + check_trainer_ret $? ;; *) ;; diff --git a/go/paddlecloud/get.go b/go/paddlecloud/get.go index be206ee2..babca708 100644 --- a/go/paddlecloud/get.go +++ b/go/paddlecloud/get.go @@ -77,18 +77,28 @@ func workers(jobname string) error { fmt.Fprintf(os.Stderr, "bad server return: %s", respBody) return err } + w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0) - fmt.Fprintln(w, "NAME\tSTATUS\tSTART\t") + fmt.Fprintln(w, "NAME\tSTATUS\tSTART\tEXIT_CODE\tMSG\t") if err != nil { fmt.Fprintf(os.Stderr, "error parsing: %s", err) return err } for _, item := range respObj.(map[string]interface{})["items"].([]interface{}) { - fmt.Fprintf(w, "%s\t%s\t%v\t\n", + var exitCode, msg interface{} + terminateState := item.(map[string]interface{})["status"].(map[string]interface{})["container_statuses"].([]interface{})[0].(map[string]interface{})["state"].(map[string]interface{})["terminated"] + + if terminateState != nil { + exitCode = terminateState.(map[string]interface{})["exit_code"] + msg = terminateState.(map[string]interface{})["message"] + } + + fmt.Fprintf(w, "%s\t%s\t%v\t%v\t%v\t\n", item.(map[string]interface{})["metadata"].(map[string]interface{})["name"].(string), item.(map[string]interface{})["status"].(map[string]interface{})["phase"].(string), - item.(map[string]interface{})["status"].(map[string]interface{})["start_time"]) + item.(map[string]interface{})["status"].(map[string]interface{})["start_time"], + exitCode, msg) } w.Flush() return nil @@ -122,34 +132,71 @@ func registry() error { w.Flush() return err } + func jobs() error { - respBody, err := restclient.GetCall(Config.ActiveConfig.Endpoint+"/api/v1/jobs/", nil) + // NOTE: a job include pserver replicaset and a trainers job, display them + // get pserver replicaset + // "status": { + // "available_replicas": 1, + // "conditions": null, + // "fully_labeled_replicas": 1, + // "observed_generation": 1, + // "ready_replicas": 1, + // "replicas": 1 + var respObj interface{} + + respBody, err := restclient.GetCall(Config.ActiveConfig.Endpoint+"/api/v1/pservers/", nil) + if err != nil { + fmt.Fprintf(os.Stderr, "error getting pservers: %v\n", err) + return err + } + err = json.Unmarshal(respBody, &respObj) + if err != nil { + return err + } + pserverItems := respObj.(map[string]interface{})["items"].([]interface{}) + + // get kubernetes jobs info + respBody, err = restclient.GetCall(Config.ActiveConfig.Endpoint+"/api/v1/jobs/", nil) if err != nil { fmt.Fprintf(os.Stderr, "error getting jobs: %v\n", err) return err } - var respObj interface{} + err = json.Unmarshal(respBody, &respObj) if err != nil { return err } items := respObj.(map[string]interface{})["items"].([]interface{}) + w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0) if len(items) >= 0 { - fmt.Fprintf(w, "NUM\tNAME\tSUCC\tFAIL\tSTART\tCOMP\tACTIVE\t\n") + fmt.Fprintf(w, "NAME\tACTIVE\tSUCC\tFAIL\tSTART\tCOMP\tPS_NAME\tPS_READY\tPS_TOTAL\t\n") } - for idx, j := range items { + for _, j := range items { jobnameTrainer := j.(map[string]interface{})["metadata"].(map[string]interface{})["name"].(string) jobnameParts := strings.Split(jobnameTrainer, "-") jobname := strings.Join(jobnameParts[0:len(jobnameParts)-1], "-") + // get info for job pservers + var psrsname string + var readyReplicas, replicas interface{} + for _, psrs := range pserverItems { + psrsname = psrs.(map[string]interface{})["metadata"].(map[string]interface{})["name"].(string) + if psrsname == jobname+"-pserver" { + readyReplicas = psrs.(map[string]interface{})["status"].(map[string]interface{})["ready_replicas"] + replicas = psrs.(map[string]interface{})["status"].(map[string]interface{})["replicas"] + break + } + } - fmt.Fprintf(w, "%d\t%s\t%v\t%v\t%v\t%v\t%v\t\n", idx, + fmt.Fprintf(w, "%s\t%v\t%v\t%v\t%v\t%v\t%s\t%v\t%v\t\n", jobname, + j.(map[string]interface{})["status"].(map[string]interface{})["active"], j.(map[string]interface{})["status"].(map[string]interface{})["succeeded"], j.(map[string]interface{})["status"].(map[string]interface{})["failed"], j.(map[string]interface{})["status"].(map[string]interface{})["start_time"], j.(map[string]interface{})["status"].(map[string]interface{})["completion_time"], - j.(map[string]interface{})["status"].(map[string]interface{})["active"]) + psrsname, readyReplicas, replicas) } w.Flush() diff --git a/paddlecloud/paddlecloud/urls.py b/paddlecloud/paddlecloud/urls.py index e2ea96a2..021e81c4 100644 --- a/paddlecloud/paddlecloud/urls.py +++ b/paddlecloud/paddlecloud/urls.py @@ -28,6 +28,7 @@ url(r'^api/sample/$', notebook.views.SampleView.as_view()), url(r"^api/v1/jobs/", paddlejob.views.JobsView.as_view()), + url(r"^api/v1/pservers/", paddlejob.views.PserversView.as_view()), url(r"^api/v1/logs/", paddlejob.views.LogsView.as_view()), url(r"^api/v1/workers/", paddlejob.views.WorkersView.as_view()), url(r"^api/v1/quota/", paddlejob.views.QuotaView.as_view()), diff --git a/paddlecloud/paddlejob/views.py b/paddlecloud/paddlejob/views.py index e9c0c8ad..70413757 100644 --- a/paddlecloud/paddlejob/views.py +++ b/paddlecloud/paddlejob/views.py @@ -228,6 +228,19 @@ def delete(self, request, format=None): retcode = 200 return utils.simple_response(retcode, "\n".join(delete_status)) +class PserversView(APIView): + permission_classes = (permissions.IsAuthenticated,) + + def get(self, request, format=None): + """ + List all pservers + """ + username = request.user.username + namespace = notebook.utils.email_escape(username) + api_instance = client.ExtensionsV1beta1Api(api_client=notebook.utils.get_user_api_client(username)) + pserver_rs_list = api_instance.list_namespaced_replica_set(namespace) + return Response(pserver_rs_list.to_dict()) + class LogsView(APIView): permission_classes = (permissions.IsAuthenticated,) From dde9e8ae81fb9739b81755b5e3eff9c3e3068564 Mon Sep 17 00:00:00 2001 From: wuyi05 Date: Tue, 4 Jul 2017 20:51:36 +0800 Subject: [PATCH 2/2] remove replica set name --- go/paddlecloud/get.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/go/paddlecloud/get.go b/go/paddlecloud/get.go index babca708..e86ccef3 100644 --- a/go/paddlecloud/get.go +++ b/go/paddlecloud/get.go @@ -171,7 +171,7 @@ func jobs() error { w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0) if len(items) >= 0 { - fmt.Fprintf(w, "NAME\tACTIVE\tSUCC\tFAIL\tSTART\tCOMP\tPS_NAME\tPS_READY\tPS_TOTAL\t\n") + fmt.Fprintf(w, "NAME\tACTIVE\tSUCC\tFAIL\tSTART\tCOMP\tPS_READY\tPS_TOTAL\t\n") } for _, j := range items { jobnameTrainer := j.(map[string]interface{})["metadata"].(map[string]interface{})["name"].(string) @@ -189,14 +189,14 @@ func jobs() error { } } - fmt.Fprintf(w, "%s\t%v\t%v\t%v\t%v\t%v\t%s\t%v\t%v\t\n", + fmt.Fprintf(w, "%s\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t\n", jobname, j.(map[string]interface{})["status"].(map[string]interface{})["active"], j.(map[string]interface{})["status"].(map[string]interface{})["succeeded"], j.(map[string]interface{})["status"].(map[string]interface{})["failed"], j.(map[string]interface{})["status"].(map[string]interface{})["start_time"], j.(map[string]interface{})["status"].(map[string]interface{})["completion_time"], - psrsname, readyReplicas, replicas) + readyReplicas, replicas) } w.Flush()