From 0a0c5f05c0d7f0e8b0d61ebc27fecd275d99afe4 Mon Sep 17 00:00:00 2001 From: wuyi05 Date: Wed, 31 May 2017 20:08:18 +0800 Subject: [PATCH 1/5] simple file --- go/paddlecloud/restclient.go | 55 ++++++++++++++++++++++++++++----- paddlecloud/paddlecloud/urls.py | 1 + paddlecloud/paddlejob/utils.py | 4 +++ paddlecloud/paddlejob/views.py | 51 ++++++++++++++++++++++++++++++ 4 files changed, 104 insertions(+), 7 deletions(-) diff --git a/go/paddlecloud/restclient.go b/go/paddlecloud/restclient.go index 5fbf7960..b8fb6db0 100644 --- a/go/paddlecloud/restclient.go +++ b/go/paddlecloud/restclient.go @@ -14,22 +14,41 @@ import ( // HTTPOK is ok status of http api call const HTTPOK = "200 OK" -func getCall(targetURL string, query map[string]string, token string) ([]byte, error) { - req, err := http.NewRequest("GET", targetURL, nil) +// makeRESTRequest returns a http request object to do paddlecloud rest requests +func makeRESTRequest(uri string, + method string, + query map[string]string, + body io.Reader, + contentType string, + token string) (*http.Request, error) { + + req, err := http.NewRequest(method, uri, body) if err != nil { - return []byte{}, err + return nil, err } - req.Header.Set("Content-Type", "application/json") + // set default content type + if len(contentType) == 0 { + contentType = "application/json" + } + req.Header.Set("Content-Type", contentType) + // add auth token to request headers if len(token) > 0 { req.Header.Set("Authorization", "Token "+token) } - + // add GET query params q := req.URL.Query() for k, v := range query { q.Add(k, v) } req.URL.RawQuery = q.Encode() + return req, nil +} +func getCall(targetURL string, query map[string]string, token string) ([]byte, error) { + req, err := makeRESTRequest(targetURL, "GET", query, nil, "", token) + if err != nil { + return []byte{}, err + } client := &http.Client{} resp, err := client.Do(req) if err != nil { @@ -89,7 +108,7 @@ func deleteCall(jsonString []byte, targetURL string, token string) ([]byte, erro return body, err } -func postFile(filename string, targetURL string) error { +func postFile(filename string, targetURL string, query map[string]string, token string) error { bodyBuf := &bytes.Buffer{} bodyWriter := multipart.NewWriter(bodyBuf) @@ -116,11 +135,31 @@ func postFile(filename string, targetURL string) error { contentType := bodyWriter.FormDataContentType() bodyWriter.Close() - resp, err := http.Post(targetURL, contentType, bodyBuf) + req, err := http.NewRequest("POST", targetURL, bodyBuf) + if err != nil { + return err + } + req.Header.Set("Content-Type", contentType) + if len(token) > 0 { + req.Header.Set("Authorization", "Token "+token) + } + + q := req.URL.Query() + for k, v := range query { + q.Add(k, v) + } + req.URL.RawQuery = q.Encode() + + client := &http.Client{} + resp, err := client.Do(req) if err != nil { return err } defer resp.Body.Close() + if resp.Status != HTTPOK { + return errors.New("http server returned non-200 status: " + resp.Status) + } + respBody, err := ioutil.ReadAll(resp.Body) if err != nil { return err @@ -129,3 +168,5 @@ func postFile(filename string, targetURL string) error { fmt.Println(string(respBody)) return nil } + +func getFile() diff --git a/paddlecloud/paddlecloud/urls.py b/paddlecloud/paddlecloud/urls.py index 6679118a..746dd451 100644 --- a/paddlecloud/paddlecloud/urls.py +++ b/paddlecloud/paddlecloud/urls.py @@ -31,6 +31,7 @@ url(r"^api/v1/logs/", paddlejob.views.LogsView.as_view()), url(r"^api/v1/workers/", paddlejob.views.WorkersView.as_view()), url(r"^api/v1/quota/", paddlejob.views.QuotaView.as_view()), + url(r"^api/v1/file/", paddlejob.views.SimpleFileView.as_view()), ] urlpatterns += static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT) diff --git a/paddlecloud/paddlejob/utils.py b/paddlecloud/paddlejob/utils.py index 5feb8d96..0ce9d68c 100644 --- a/paddlecloud/paddlejob/utils.py +++ b/paddlecloud/paddlejob/utils.py @@ -12,6 +12,10 @@ def simple_response(code, msg): "msg": msg }) +def error_message_response(msg): + logging.error("error: %s", msg) + return Response({"msg": msg}) + def convert_camel2snake(data): s1 = first_cap_re.sub(r'\1_\2', name) return all_cap_re.sub(r'\1_\2', s1).lower() diff --git a/paddlecloud/paddlejob/views.py b/paddlecloud/paddlejob/views.py index d5964bbf..71974ebb 100644 --- a/paddlecloud/paddlejob/views.py +++ b/paddlecloud/paddlejob/views.py @@ -257,3 +257,54 @@ def get(self, request, format=None): quota_list = api_client.CoreV1Api(api_cilent=api_client)\ .list_namespaced_resource_quota(namespace) return Response(quota_list.to_dict()) + + +class SimpleFileView(APIView): + permission_classes = (permissions.IsAuthenticated,) + + def __validate_path(self, file_path): + """ + returns error_msg. error_msg will be empty if there's no error + """ + path_parts = file_path.split(os.path.sep) + + assert(path_parts[1]=="pfs") + assert(path_parts[2] in settings.DATACENTERS.keys()) + assert(path_parts[3] == "home") + assert(path_parts[4] == request.user.username) + + server_file = os.path.join(settings.STORAGE_PATH, request.user.username, *path_parts[5:]) + if os.path.exists(os.sep+write_file): + return "file already exist" + return server_file + + def get(self, request, format=None): + """ + Simple down file + """ + file_path = request.query_params.get("path") + try: + write_file = self.__validate_path(file_path) + except Exception, e: + return utils.error_message_response("file path not valid: %s", e) + response = HttpResponse(FileWrapper(open(write_file)), content_type='text/plain') + response['Content-Disposition'] = 'attachment; filename="%s"' % os.path.basename(write_file) + + return response + + def post(self, request, format=None): + """ + Simple up file + """ + file_obj = request.FILES['file'] + file_path = request.query_params.get("path") + if not file_path: + return utils.error_message_response("must specify path") + try: + write_file = self.__validate_path(file_path) + except Exception, e: + return utils.error_message_response("file path not valid: %s", e) + with open(os.sep+write_file, "w") as fn: + fn.write(file_obj.read()) + + return Response({"msg": "OK"}) From a82a6458aaa975131576a71ce077f792a1db60e3 Mon Sep 17 00:00:00 2001 From: wuyi05 Date: Fri, 2 Jun 2017 19:39:25 +0800 Subject: [PATCH 2/5] submit with put package --- go/paddlecloud/simplefile.go | 119 +++++++++++++++++++++++++++++++++++ go/paddlecloud/submit.go | 9 +-- 2 files changed, 124 insertions(+), 4 deletions(-) create mode 100644 go/paddlecloud/simplefile.go diff --git a/go/paddlecloud/simplefile.go b/go/paddlecloud/simplefile.go new file mode 100644 index 00000000..c1dfe48a --- /dev/null +++ b/go/paddlecloud/simplefile.go @@ -0,0 +1,119 @@ +package paddlecloud + +import ( + "context" + "errors" + "flag" + "fmt" + "io" + "net/http" + "os" + "path" + + "github.com/google/subcommands" +) + +// SimpleFileCmd define the subcommand of simple file operations +type SimpleFileCmd struct { +} + +// Name is subcommands name +func (*SimpleFileCmd) Name() string { return "file" } + +// Synopsis is subcommands synopsis +func (*SimpleFileCmd) Synopsis() string { return "Simple file operations." } + +// Usage is subcommands Usage +func (*SimpleFileCmd) Usage() string { + return `file [put|get] : + Options: +` +} + +// SetFlags registers subcommands flags +func (p *SimpleFileCmd) SetFlags(f *flag.FlagSet) { +} + +// Execute file ops +func (p *SimpleFileCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus { + if f.NArg() < 1 || f.NArg() > 3 { + f.Usage() + return subcommands.ExitFailure + } + switch f.Arg(0) { + case "put": + err := putFile(f.Arg(1), f.Arg(2)) + if err != nil { + fmt.Fprintf(os.Stderr, "put file error: %s\n", err) + return subcommands.ExitFailure + } + case "get": + err := getFile(f.Arg(1), f.Arg(2)) + if err != nil { + fmt.Fprintf(os.Stderr, "get file error: %s\n", err) + return subcommands.ExitFailure + } + default: + f.Usage() + return subcommands.ExitFailure + } + return subcommands.ExitSuccess +} + +func putFile(src string, dest string) error { + query := make(map[string]string) + _, srcFile := path.Split(src) + destDir, destFile := path.Split(dest) + var destFullPath string + if len(destFile) == 0 { + destFullPath = path.Join(destDir, srcFile) + } else { + destFullPath = dest + } + query["path"] = destFullPath + respStr, err := PostFile(config.ActiveConfig.Endpoint+"/api/v1/file/", src, query) + if err != nil { + return err + } + fmt.Printf("%s\n", respStr) + return nil +} + +func getFile(src string, dest string) error { + query := make(map[string]string) + query["path"] = src + req, err := makeRequestToken(config.ActiveConfig.Endpoint+"/api/v1/file/", "GET", nil, "", query) + if err != nil { + return err + } + client := &http.Client{Transport: httpTransport} + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.Status != HTTPOK { + return errors.New("server error: " + resp.Status) + } + _, srcFile := path.Split(src) + destDir, destFile := path.Split(dest) + var destFullPath string + if len(destFile) == 0 { + destFullPath = path.Join(destDir, srcFile) + } else { + destFullPath = dest + } + if _, err = os.Stat(destFullPath); err == nil { + return errors.New("file already exist: " + destFullPath) + } + out, err := os.Create(destFullPath) + if err != nil { + return err + } + defer out.Close() + _, err = io.Copy(out, resp.Body) + if err != nil { + return err + } + return nil +} diff --git a/go/paddlecloud/submit.go b/go/paddlecloud/submit.go index c530eb1c..e86f1fed 100644 --- a/go/paddlecloud/submit.go +++ b/go/paddlecloud/submit.go @@ -6,6 +6,7 @@ import ( "flag" "fmt" "os" + "path" "path/filepath" "github.com/golang/glog" @@ -95,10 +96,10 @@ func NewSubmitter(cmd *SubmitCmd) *Submitter { // Submit current job func (s *Submitter) Submit(jobPackage string) error { // 1. upload user job package to pfs - filepath.Walk(jobPackage, func(path string, info os.FileInfo, err error) error { - glog.V(10).Infof("Uploading %s...\n", path) - return nil - //return postFile(path, config.activeConfig.endpoint+"/api/v1/files") + filepath.Walk(jobPackage, func(filePath string, info os.FileInfo, err error) error { + glog.V(10).Infof("Uploading %s...\n", filePath) + dest := path.Join("pfs", config.ActiveConfig.Name, "home", config.ActiveConfig.Username, filePath) + return putFile(filePath, dest) }) // 2. call paddlecloud server to create kubernetes job jsonString, err := json.Marshal(s.args) From e6be893fc6564679b1759427993e336415f1bd23 Mon Sep 17 00:00:00 2001 From: wuyi05 Date: Tue, 6 Jun 2017 19:39:07 +0800 Subject: [PATCH 3/5] submit job and upload package --- go/paddlecloud/simplefile.go | 11 +++++++- go/paddlecloud/submit.go | 29 ++++++++++++++++---- paddlecloud/paddlejob/views.py | 48 +++++++++++++++++++++++++--------- 3 files changed, 69 insertions(+), 19 deletions(-) diff --git a/go/paddlecloud/simplefile.go b/go/paddlecloud/simplefile.go index c1dfe48a..4e325fb5 100644 --- a/go/paddlecloud/simplefile.go +++ b/go/paddlecloud/simplefile.go @@ -2,6 +2,7 @@ package paddlecloud import ( "context" + "encoding/json" "errors" "flag" "fmt" @@ -75,7 +76,15 @@ func putFile(src string, dest string) error { if err != nil { return err } - fmt.Printf("%s\n", respStr) + var respObj interface{} + if err = json.Unmarshal(respStr, &respObj); err != nil { + return err + } + // FIXME: Print an error if error message is not empty. Use response code instead + errMsg := respObj.(map[string]interface{})["msg"].(string) + if len(errMsg) > 0 { + fmt.Fprintf(os.Stderr, "upload file error: %s\n", errMsg) + } return nil } diff --git a/go/paddlecloud/submit.go b/go/paddlecloud/submit.go index e86f1fed..16a8067a 100644 --- a/go/paddlecloud/submit.go +++ b/go/paddlecloud/submit.go @@ -3,6 +3,7 @@ package paddlecloud import ( "context" "encoding/json" + "errors" "flag" "fmt" "os" @@ -54,7 +55,7 @@ func (p *SubmitCmd) SetFlags(f *flag.FlagSet) { f.IntVar(&p.Pservers, "pservers", 0, "Number of parameter servers. Defaults equal to -p") f.IntVar(&p.PSCPU, "pscpu", 1, "Parameter server CPU resource. Defaults to 1.") f.StringVar(&p.PSMemory, "psmemory", "1Gi", "Parameter server momory resource. Defaults to 1Gi.") - f.StringVar(&p.Entry, "entry", "paddle train", "Command of starting trainer process. Defaults to paddle train") + f.StringVar(&p.Entry, "entry", "", "Command of starting trainer process. Defaults to paddle train") f.StringVar(&p.Topology, "topology", "", "Will Be Deprecated .py file contains paddle v1 job configs") f.IntVar(&p.Passes, "passes", 1, "Pass count for training job") } @@ -96,11 +97,18 @@ func NewSubmitter(cmd *SubmitCmd) *Submitter { // Submit current job func (s *Submitter) Submit(jobPackage string) error { // 1. upload user job package to pfs - filepath.Walk(jobPackage, func(filePath string, info os.FileInfo, err error) error { + err := filepath.Walk(jobPackage, func(filePath string, info os.FileInfo, err error) error { + if info.IsDir() { + return nil + } glog.V(10).Infof("Uploading %s...\n", filePath) - dest := path.Join("pfs", config.ActiveConfig.Name, "home", config.ActiveConfig.Username, filePath) + dest := "/" + path.Join("pfs", config.ActiveConfig.Name, "home", config.ActiveConfig.Username, filePath) + fmt.Printf("uploading: %s...\n", filePath) return putFile(filePath, dest) }) + if err != nil { + return err + } // 2. call paddlecloud server to create kubernetes job jsonString, err := json.Marshal(s.args) if err != nil { @@ -108,6 +116,17 @@ func (s *Submitter) Submit(jobPackage string) error { } glog.V(10).Infof("Submitting job: %s to %s\n", jsonString, config.ActiveConfig.Endpoint+"/api/v1/jobs") respBody, err := PostCall(config.ActiveConfig.Endpoint+"/api/v1/jobs/", jsonString) - glog.V(10).Infof("got return body size: %d", len(respBody)) - return err + if err != nil { + return err + } + var respObj interface{} + if err = json.Unmarshal(respBody, &respObj); err != nil { + return err + } + // FIXME: Return an error if error message is not empty. Use response code instead + errMsg := respObj.(map[string]interface{})["msg"].(string) + if len(errMsg) > 0 { + return errors.New(errMsg) + } + return nil } diff --git a/paddlecloud/paddlejob/views.py b/paddlecloud/paddlejob/views.py index 030b1b41..ac662c3c 100644 --- a/paddlecloud/paddlejob/views.py +++ b/paddlecloud/paddlejob/views.py @@ -72,6 +72,23 @@ def post(self, request, format=None): # get user specified image job_image = obj.get("image", None) gpu_count = obj.get("gpu", 0) + # jobPackage validation: startwith /pfs + # NOTE: always overwrite the job package when the directory exists + job_package =obj.get("jobPackage", "") + if not job_package.startswith("/pfs"): + # add /pfs... cloud path + if job_package.startswith("/"): + # use last dirname as package name + package_in_pod = os.path.join("/pfs/%s/home/%s"%(dc, username), os.path.basename(job_package)) + else: + package_in_pod = os.path.join("/pfs/%s/home/%s"%(dc, username), job_package) + else: + package_in_pod = job_package + # package must be ready before submit a job + current_package_path = package_in_pod.replace("/pfs/%s/home"%dc, settings.STORAGE_PATH) + if not os.path.exists(current_package_path): + return utils.error_message_response("error: package not exist in cloud") + # use default images if not job_image : if gpu_count > 0: @@ -90,7 +107,7 @@ def post(self, request, format=None): paddle_job = PaddleJob( name = obj.get("name", "paddle-cluster-job"), - job_package = obj.get("jobPackage", ""), + job_package = package_in_pod, parallelism = obj.get("parallelism", 1), cpu = obj.get("cpu", 1), memory = obj.get("memory", "1Gi"), @@ -123,7 +140,7 @@ def post(self, request, format=None): except ApiException, e: logging.error("error submitting trainer job: %s" % e) return utils.simple_response(500, str(e)) - return utils.simple_response(200, "OK") + return utils.simple_response(200, "") def delete(self, request, format=None): """ @@ -288,7 +305,7 @@ def get(self, request, format=None): try: write_file = self.__validate_path(request, file_path) except Exception, e: - return utils.error_message_response("file path not valid: %s", e) + return utils.error_message_response("file path not valid: %s"%str(e)) if not os.path.exists(os.sep+write_file): return Response({"msg": "file not exist"}) @@ -309,15 +326,20 @@ def post(self, request, format=None): try: write_file = self.__validate_path(request, file_path) except Exception, e: - return utils.error_message_response("file path not valid: %s"%e) - - if os.path.exists(os.sep+write_file): - return Response({"msg": "file already exist"}) - - with open(os.sep+write_file, "w") as fn: - for chunk in file_obj.read(4096): - if not chunk: + return utils.error_message_response("file path not valid: %s"%str(e)) + + if not os.path.exists(os.path.dirname(write_file)): + try: + os.makedirs(os.path.dirname(write_file)) + except OSError as exc: # Guard against race condition + if exc.errno != errno.EEXIST: + raise + # FIXME: always overwrite package files + with open(write_file, "w") as fn: + while True: + data = file_obj.read(4096) + if not data: break - fn.write(chunk) + fn.write(data) - return Response({"msg": "OK"}) + return Response({"msg": ""}) From a9521e64bd146bfe0f60f726eb55d84ab5a3235d Mon Sep 17 00:00:00 2001 From: wuyi05 Date: Wed, 7 Jun 2017 10:57:16 +0800 Subject: [PATCH 4/5] update --- go/paddlecloud/restclient.go | 14 +++++--------- go/paddlecloud/simplefile.go | 12 ++++++------ go/paddlecloud/submit.go | 20 ++++++++++---------- paddlecloud/paddlejob/views.py | 6 +++--- 4 files changed, 24 insertions(+), 28 deletions(-) diff --git a/go/paddlecloud/restclient.go b/go/paddlecloud/restclient.go index b8e314e3..af5c687c 100644 --- a/go/paddlecloud/restclient.go +++ b/go/paddlecloud/restclient.go @@ -99,30 +99,26 @@ func DeleteCall(targetURL string, jsonString []byte) ([]byte, error) { func PostFile(targetURL string, filename string, query url.Values) ([]byte, error) { bodyBuf := &bytes.Buffer{} bodyWriter := multipart.NewWriter(bodyBuf) - - // this step is very important fileWriter, err := bodyWriter.CreateFormFile("file", filename) if err != nil { fmt.Fprintf(os.Stderr, "error writing to buffer: %v\n", err) return []byte{}, err } - - // open file handle fh, err := os.Open(filename) defer fh.Close() if err != nil { fmt.Fprintf(os.Stderr, "error opening file: %v\n", err) return []byte{}, err } - - //iocopy _, err = io.Copy(fileWriter, fh) if err != nil { return []byte{}, err } contentType := bodyWriter.FormDataContentType() - bodyWriter.Close() + if err = bodyWriter.Close(); err != nil { + return []byte{}, err + } req, err := makeRequestToken(targetURL, "POST", bodyBuf, contentType, query) if err != nil { @@ -131,7 +127,7 @@ func PostFile(targetURL string, filename string, query url.Values) ([]byte, erro return getResponse(req) } -// PostChunkData makes a POST call to HTTP server to upload chunkdata. +// PostChunk makes a POST call to HTTP server to upload chunkdata. func PostChunk(targetURL string, chunkName string, reader io.Reader, len int64, boundary string) ([]byte, error) { body := &bytes.Buffer{} @@ -161,7 +157,7 @@ func PostChunk(targetURL string, return getResponse(req) } -// GetChunkData makes a GET call to HTTP server to download chunk data. +// GetChunk makes a GET call to HTTP server to download chunk data. func GetChunk(targetURL string, query url.Values) (*http.Response, error) { req, err := makeRequestToken(targetURL, "GET", nil, "", query) diff --git a/go/paddlecloud/simplefile.go b/go/paddlecloud/simplefile.go index f35b0839..aafdd085 100644 --- a/go/paddlecloud/simplefile.go +++ b/go/paddlecloud/simplefile.go @@ -14,28 +14,28 @@ import ( "github.com/google/subcommands" ) -// SimpleFileCmd define the subcommand of simple file operations +// SimpleFileCmd define the subcommand of simple file operations. type SimpleFileCmd struct { } -// Name is subcommands name +// Name is subcommands name. func (*SimpleFileCmd) Name() string { return "file" } -// Synopsis is subcommands synopsis +// Synopsis is subcommands synopsis. func (*SimpleFileCmd) Synopsis() string { return "Simple file operations." } -// Usage is subcommands Usage +// Usage is subcommands Usage. func (*SimpleFileCmd) Usage() string { return `file [put|get] : Options: ` } -// SetFlags registers subcommands flags +// SetFlags registers subcommands flags. func (p *SimpleFileCmd) SetFlags(f *flag.FlagSet) { } -// Execute file ops +// Execute file ops. func (p *SimpleFileCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus { if f.NArg() < 1 || f.NArg() > 3 { f.Usage() diff --git a/go/paddlecloud/submit.go b/go/paddlecloud/submit.go index 16a8067a..0dbb391c 100644 --- a/go/paddlecloud/submit.go +++ b/go/paddlecloud/submit.go @@ -14,7 +14,7 @@ import ( "github.com/google/subcommands" ) -// SubmitCmd define the subcommand of submitting paddle training jobs +// SubmitCmd define the subcommand of submitting paddle training jobs. type SubmitCmd struct { Jobname string `json:"name"` Jobpackage string `json:"jobPackage"` @@ -31,13 +31,13 @@ type SubmitCmd struct { Passes int `json:"passes"` } -// Name is subcommands name +// Name is subcommands name. func (*SubmitCmd) Name() string { return "submit" } -// Synopsis is subcommands synopsis +// Synopsis is subcommands synopsis. func (*SubmitCmd) Synopsis() string { return "Submit job to PaddlePaddle Cloud." } -// Usage is subcommands Usage +// Usage is subcommands Usage. func (*SubmitCmd) Usage() string { return `submit [options] : Submit job to PaddlePaddle Cloud. @@ -45,7 +45,7 @@ func (*SubmitCmd) Usage() string { ` } -// SetFlags registers subcommands flags +// SetFlags registers subcommands flags. func (p *SubmitCmd) SetFlags(f *flag.FlagSet) { f.StringVar(&p.Jobname, "jobname", "paddle-cluster-job", "Cluster job name.") f.IntVar(&p.Parallelism, "parallelism", 1, "Number of parrallel trainers. Defaults to 1.") @@ -60,13 +60,13 @@ func (p *SubmitCmd) SetFlags(f *flag.FlagSet) { f.IntVar(&p.Passes, "passes", 1, "Pass count for training job") } -// Execute submit command +// Execute submit command. func (p *SubmitCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus { if f.NArg() != 1 { f.Usage() return subcommands.ExitFailure } - // default pservers count equals to trainers count + // default pservers count equals to trainers count. if p.Pservers == 0 { p.Pservers = p.Parallelism } @@ -83,18 +83,18 @@ func (p *SubmitCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{} return subcommands.ExitSuccess } -// Submitter submit job to cloud +// Submitter submit job to cloud. type Submitter struct { args *SubmitCmd } -// NewSubmitter returns a submitter object +// NewSubmitter returns a submitter object. func NewSubmitter(cmd *SubmitCmd) *Submitter { s := Submitter{cmd} return &s } -// Submit current job +// Submit current job. func (s *Submitter) Submit(jobPackage string) error { // 1. upload user job package to pfs err := filepath.Walk(jobPackage, func(filePath string, info os.FileInfo, err error) error { diff --git a/paddlecloud/paddlejob/views.py b/paddlecloud/paddlejob/views.py index fba629f8..7816fdb9 100644 --- a/paddlecloud/paddlejob/views.py +++ b/paddlecloud/paddlejob/views.py @@ -303,7 +303,7 @@ class SimpleFileView(APIView): def __validate_path(self, request, file_path): """ - returns error_msg. error_msg will be empty if there's no error + returns error_msg. error_msg will be empty if there's no error. """ path_parts = file_path.split(os.path.sep) @@ -318,7 +318,7 @@ def __validate_path(self, request, file_path): def get(self, request, format=None): """ - Simple down file + Simple get file. """ file_path = request.query_params.get("path") try: @@ -336,7 +336,7 @@ def get(self, request, format=None): def post(self, request, format=None): """ - Simple up file + Simple put file. """ file_obj = request.data['file'] file_path = request.query_params.get("path") From add7f0dec2ceb0878bf14606bbdf3a31d6eefc25 Mon Sep 17 00:00:00 2001 From: wuyi05 Date: Wed, 7 Jun 2017 15:58:19 +0800 Subject: [PATCH 5/5] submit package to jobname directory --- go/paddlecloud/submit.go | 6 +++--- paddlecloud/paddlecloud/settings.py | 2 +- paddlecloud/paddlejob/views.py | 17 +++++------------ 3 files changed, 9 insertions(+), 16 deletions(-) diff --git a/go/paddlecloud/submit.go b/go/paddlecloud/submit.go index 0dbb391c..bce16e28 100644 --- a/go/paddlecloud/submit.go +++ b/go/paddlecloud/submit.go @@ -74,7 +74,7 @@ func (p *SubmitCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{} p.Datacenter = config.ActiveConfig.Name s := NewSubmitter(p) - errS := s.Submit(f.Arg(0)) + errS := s.Submit(f.Arg(0), p.Jobname) if errS != nil { fmt.Fprintf(os.Stderr, "error submiting job: %v\n", errS) return subcommands.ExitFailure @@ -95,14 +95,14 @@ func NewSubmitter(cmd *SubmitCmd) *Submitter { } // Submit current job. -func (s *Submitter) Submit(jobPackage string) error { +func (s *Submitter) Submit(jobPackage string, jobName string) error { // 1. upload user job package to pfs err := filepath.Walk(jobPackage, func(filePath string, info os.FileInfo, err error) error { if info.IsDir() { return nil } glog.V(10).Infof("Uploading %s...\n", filePath) - dest := "/" + path.Join("pfs", config.ActiveConfig.Name, "home", config.ActiveConfig.Username, filePath) + dest := path.Join("/pfs", config.ActiveConfig.Name, "home", config.ActiveConfig.Username, "jobs", jobName, filepath.Base(filePath)) fmt.Printf("uploading: %s...\n", filePath) return putFile(filePath, dest) }) diff --git a/paddlecloud/paddlecloud/settings.py b/paddlecloud/paddlecloud/settings.py index 9825f4cc..dcaee078 100644 --- a/paddlecloud/paddlecloud/settings.py +++ b/paddlecloud/paddlecloud/settings.py @@ -278,7 +278,7 @@ FSTYPE_CEPHFS = "cephfs" FSTYPE_HOSTPATH = "hostpath" DATACENTERS = { - "datacenter1":{ + "meiyan":{ "fstype": FSTYPE_CEPHFS, "monitors_addr": ["172.19.32.166:6789"], # must be a list "secret": "ceph-secret", diff --git a/paddlecloud/paddlejob/views.py b/paddlecloud/paddlejob/views.py index 7816fdb9..3c734f71 100644 --- a/paddlecloud/paddlejob/views.py +++ b/paddlecloud/paddlejob/views.py @@ -92,17 +92,10 @@ def post(self, request, format=None): job_image = obj.get("image", None) gpu_count = obj.get("gpu", 0) # jobPackage validation: startwith /pfs - # NOTE: always overwrite the job package when the directory exists - job_package =obj.get("jobPackage", "") - if not job_package.startswith("/pfs"): - # add /pfs... cloud path - if job_package.startswith("/"): - # use last dirname as package name - package_in_pod = os.path.join("/pfs/%s/home/%s"%(dc, username), os.path.basename(job_package)) - else: - package_in_pod = os.path.join("/pfs/%s/home/%s"%(dc, username), job_package) - else: - package_in_pod = job_package + # NOTE: job packages are uploaded to /pfs/[dc]/home/[user]/jobs/[jobname] + job_name = obj.get("name", "paddle-cluster-job") + package_in_pod = os.path.join("/pfs/%s/home/%s"%(dc, username), "jobs", job_name) + # package must be ready before submit a job current_package_path = package_in_pod.replace("/pfs/%s/home"%dc, settings.STORAGE_PATH) if not os.path.exists(current_package_path): @@ -125,7 +118,7 @@ def post(self, request, format=None): )) paddle_job = PaddleJob( - name = obj.get("name", "paddle-cluster-job"), + name = job_name, job_package = package_in_pod, parallelism = obj.get("parallelism", 1), cpu = obj.get("cpu", 1),