diff --git a/go/cmd/paddlecloud/paddlecloud.go b/go/cmd/paddlecloud/paddlecloud.go index be4bf264..01520f3c 100644 --- a/go/cmd/paddlecloud/paddlecloud.go +++ b/go/cmd/paddlecloud/paddlecloud.go @@ -16,6 +16,7 @@ func main() { subcommands.Register(&paddlecloud.LogsCommand{}, "") subcommands.Register(&paddlecloud.GetCommand{}, "") subcommands.Register(&paddlecloud.KillCommand{}, "") + subcommands.Register(&paddlecloud.SimpleFileCmd{}, "") subcommands.Register(&paddlecloud.LsCommand{}, "") subcommands.Register(&paddlecloud.CpCommand{}, "") subcommands.Register(&paddlecloud.RmCommand{}, "") diff --git a/go/paddlecloud/download.go b/go/paddlecloud/download.go index 7ef388ce..aba2bd12 100644 --- a/go/paddlecloud/download.go +++ b/go/paddlecloud/download.go @@ -116,7 +116,7 @@ func downloadFile(src string, srcFileSize int64, dst string) error { dstMeta, err := pfsmod.GetChunkMeta(dst, defaultChunkSize) if err != nil { if os.IsNotExist(err) { - if err := pfsmod.CreateSizedFile(dst, srcFileSize); err != nil { + if err = pfsmod.CreateSizedFile(dst, srcFileSize); err != nil { return err } } else { diff --git a/go/paddlecloud/restclient.go b/go/paddlecloud/restclient.go index 813ca996..65f42510 100644 --- a/go/paddlecloud/restclient.go +++ b/go/paddlecloud/restclient.go @@ -22,14 +22,6 @@ var httpClient = &http.Client{Transport: &http.Transport{}} func makeRequest(uri string, method string, body io.Reader, contentType string, query url.Values, authHeader map[string]string) (*http.Request, error) { - - if query != nil { - uri = fmt.Sprintf("%s?%s", uri, query.Encode()) - log.V(4).Infoln(uri) - } - - log.V(4).Infof("%s %s %T\n", method, uri, body) - req, err := http.NewRequest(method, uri, body) if err != nil { log.Errorf("new request %v\n", err) @@ -47,6 +39,9 @@ func makeRequest(uri string, method string, body io.Reader, req.Header.Set(k, v) } + if query != nil { + req.URL.RawQuery = query.Encode() + } return req, nil } @@ -107,34 +102,31 @@ func DeleteCall(targetURL string, jsonString []byte) ([]byte, error) { } // PostFile make a POST call to HTTP server to upload a file. -func PostFile(targetURL string, filename string) ([]byte, error) { +func PostFile(targetURL string, filename string, query url.Values) ([]byte, error) { bodyBuf := &bytes.Buffer{} bodyWriter := multipart.NewWriter(bodyBuf) - - // this step is very important - fileWriter, err := bodyWriter.CreateFormFile("uploadfile", filename) + fileWriter, err := bodyWriter.CreateFormFile("file", filename) if err != nil { fmt.Fprintf(os.Stderr, "error writing to buffer: %v\n", err) return []byte{}, err } - - // open file handle fh, err := os.Open(filename) + defer fh.Close() if err != nil { fmt.Fprintf(os.Stderr, "error opening file: %v\n", err) return []byte{}, err } - - //iocopy _, err = io.Copy(fileWriter, fh) if err != nil { return []byte{}, err } contentType := bodyWriter.FormDataContentType() - bodyWriter.Close() + if err = bodyWriter.Close(); err != nil { + return []byte{}, err + } - req, err := makeRequestToken(targetURL, "POST", bodyBuf, contentType, nil) + req, err := makeRequestToken(targetURL, "POST", bodyBuf, contentType, query) if err != nil { return []byte{}, err } diff --git a/go/paddlecloud/simplefile.go b/go/paddlecloud/simplefile.go new file mode 100644 index 00000000..aafdd085 --- /dev/null +++ b/go/paddlecloud/simplefile.go @@ -0,0 +1,127 @@ +package paddlecloud + +import ( + "context" + "encoding/json" + "errors" + "flag" + "fmt" + "io" + "net/url" + "os" + "path" + + "github.com/google/subcommands" +) + +// SimpleFileCmd define the subcommand of simple file operations. +type SimpleFileCmd struct { +} + +// Name is subcommands name. +func (*SimpleFileCmd) Name() string { return "file" } + +// Synopsis is subcommands synopsis. +func (*SimpleFileCmd) Synopsis() string { return "Simple file operations." } + +// Usage is subcommands Usage. +func (*SimpleFileCmd) Usage() string { + return `file [put|get] : + Options: +` +} + +// SetFlags registers subcommands flags. +func (p *SimpleFileCmd) SetFlags(f *flag.FlagSet) { +} + +// Execute file ops. +func (p *SimpleFileCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus { + if f.NArg() < 1 || f.NArg() > 3 { + f.Usage() + return subcommands.ExitFailure + } + switch f.Arg(0) { + case "put": + err := putFile(f.Arg(1), f.Arg(2)) + if err != nil { + fmt.Fprintf(os.Stderr, "put file error: %s\n", err) + return subcommands.ExitFailure + } + case "get": + err := getFile(f.Arg(1), f.Arg(2)) + if err != nil { + fmt.Fprintf(os.Stderr, "get file error: %s\n", err) + return subcommands.ExitFailure + } + default: + f.Usage() + return subcommands.ExitFailure + } + return subcommands.ExitSuccess +} + +func putFile(src string, dest string) error { + query := url.Values{} + _, srcFile := path.Split(src) + destDir, destFile := path.Split(dest) + var destFullPath string + if len(destFile) == 0 { + destFullPath = path.Join(destDir, srcFile) + } else { + destFullPath = dest + } + query.Set("path", destFullPath) + respStr, err := PostFile(config.ActiveConfig.Endpoint+"/api/v1/file/", src, query) + if err != nil { + return err + } + var respObj interface{} + if err = json.Unmarshal(respStr, &respObj); err != nil { + return err + } + // FIXME: Print an error if error message is not empty. Use response code instead + errMsg := respObj.(map[string]interface{})["msg"].(string) + if len(errMsg) > 0 { + fmt.Fprintf(os.Stderr, "upload file error: %s\n", errMsg) + } + return nil +} + +func getFile(src string, dest string) error { + query := url.Values{} + query.Set("path", src) + req, err := makeRequestToken(config.ActiveConfig.Endpoint+"/api/v1/file/", "GET", nil, "", query) + if err != nil { + return err + } + resp, err := httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.Status != HTTPOK { + return errors.New("server error: " + resp.Status) + } + _, srcFile := path.Split(src) + destDir, destFile := path.Split(dest) + var destFullPath string + if len(destFile) == 0 { + destFullPath = path.Join(destDir, srcFile) + } else { + destFullPath = dest + } + if _, err = os.Stat(destFullPath); err == nil { + return errors.New("file already exist: " + destFullPath) + } + out, err := os.Create(destFullPath) + if err != nil { + return err + } + defer out.Close() + _, err = io.Copy(out, resp.Body) + if err != nil { + return err + } + return nil +} diff --git a/go/paddlecloud/submit.go b/go/paddlecloud/submit.go index c530eb1c..bce16e28 100644 --- a/go/paddlecloud/submit.go +++ b/go/paddlecloud/submit.go @@ -3,16 +3,18 @@ package paddlecloud import ( "context" "encoding/json" + "errors" "flag" "fmt" "os" + "path" "path/filepath" "github.com/golang/glog" "github.com/google/subcommands" ) -// SubmitCmd define the subcommand of submitting paddle training jobs +// SubmitCmd define the subcommand of submitting paddle training jobs. type SubmitCmd struct { Jobname string `json:"name"` Jobpackage string `json:"jobPackage"` @@ -29,13 +31,13 @@ type SubmitCmd struct { Passes int `json:"passes"` } -// Name is subcommands name +// Name is subcommands name. func (*SubmitCmd) Name() string { return "submit" } -// Synopsis is subcommands synopsis +// Synopsis is subcommands synopsis. func (*SubmitCmd) Synopsis() string { return "Submit job to PaddlePaddle Cloud." } -// Usage is subcommands Usage +// Usage is subcommands Usage. func (*SubmitCmd) Usage() string { return `submit [options] : Submit job to PaddlePaddle Cloud. @@ -43,7 +45,7 @@ func (*SubmitCmd) Usage() string { ` } -// SetFlags registers subcommands flags +// SetFlags registers subcommands flags. func (p *SubmitCmd) SetFlags(f *flag.FlagSet) { f.StringVar(&p.Jobname, "jobname", "paddle-cluster-job", "Cluster job name.") f.IntVar(&p.Parallelism, "parallelism", 1, "Number of parrallel trainers. Defaults to 1.") @@ -53,18 +55,18 @@ func (p *SubmitCmd) SetFlags(f *flag.FlagSet) { f.IntVar(&p.Pservers, "pservers", 0, "Number of parameter servers. Defaults equal to -p") f.IntVar(&p.PSCPU, "pscpu", 1, "Parameter server CPU resource. Defaults to 1.") f.StringVar(&p.PSMemory, "psmemory", "1Gi", "Parameter server momory resource. Defaults to 1Gi.") - f.StringVar(&p.Entry, "entry", "paddle train", "Command of starting trainer process. Defaults to paddle train") + f.StringVar(&p.Entry, "entry", "", "Command of starting trainer process. Defaults to paddle train") f.StringVar(&p.Topology, "topology", "", "Will Be Deprecated .py file contains paddle v1 job configs") f.IntVar(&p.Passes, "passes", 1, "Pass count for training job") } -// Execute submit command +// Execute submit command. func (p *SubmitCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus { if f.NArg() != 1 { f.Usage() return subcommands.ExitFailure } - // default pservers count equals to trainers count + // default pservers count equals to trainers count. if p.Pservers == 0 { p.Pservers = p.Parallelism } @@ -72,7 +74,7 @@ func (p *SubmitCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{} p.Datacenter = config.ActiveConfig.Name s := NewSubmitter(p) - errS := s.Submit(f.Arg(0)) + errS := s.Submit(f.Arg(0), p.Jobname) if errS != nil { fmt.Fprintf(os.Stderr, "error submiting job: %v\n", errS) return subcommands.ExitFailure @@ -81,25 +83,32 @@ func (p *SubmitCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{} return subcommands.ExitSuccess } -// Submitter submit job to cloud +// Submitter submit job to cloud. type Submitter struct { args *SubmitCmd } -// NewSubmitter returns a submitter object +// NewSubmitter returns a submitter object. func NewSubmitter(cmd *SubmitCmd) *Submitter { s := Submitter{cmd} return &s } -// Submit current job -func (s *Submitter) Submit(jobPackage string) error { +// Submit current job. +func (s *Submitter) Submit(jobPackage string, jobName string) error { // 1. upload user job package to pfs - filepath.Walk(jobPackage, func(path string, info os.FileInfo, err error) error { - glog.V(10).Infof("Uploading %s...\n", path) - return nil - //return postFile(path, config.activeConfig.endpoint+"/api/v1/files") + err := filepath.Walk(jobPackage, func(filePath string, info os.FileInfo, err error) error { + if info.IsDir() { + return nil + } + glog.V(10).Infof("Uploading %s...\n", filePath) + dest := path.Join("/pfs", config.ActiveConfig.Name, "home", config.ActiveConfig.Username, "jobs", jobName, filepath.Base(filePath)) + fmt.Printf("uploading: %s...\n", filePath) + return putFile(filePath, dest) }) + if err != nil { + return err + } // 2. call paddlecloud server to create kubernetes job jsonString, err := json.Marshal(s.args) if err != nil { @@ -107,6 +116,17 @@ func (s *Submitter) Submit(jobPackage string) error { } glog.V(10).Infof("Submitting job: %s to %s\n", jsonString, config.ActiveConfig.Endpoint+"/api/v1/jobs") respBody, err := PostCall(config.ActiveConfig.Endpoint+"/api/v1/jobs/", jsonString) - glog.V(10).Infof("got return body size: %d", len(respBody)) - return err + if err != nil { + return err + } + var respObj interface{} + if err = json.Unmarshal(respBody, &respObj); err != nil { + return err + } + // FIXME: Return an error if error message is not empty. Use response code instead + errMsg := respObj.(map[string]interface{})["msg"].(string) + if len(errMsg) > 0 { + return errors.New(errMsg) + } + return nil } diff --git a/paddlecloud/notebook/views.py b/paddlecloud/notebook/views.py index f9363262..cfd59002 100644 --- a/paddlecloud/notebook/views.py +++ b/paddlecloud/notebook/views.py @@ -151,7 +151,10 @@ def create_user_RBAC_permissions(username): "name": username }] } - rbacapi.create_namespaced_role_binding(namespace, body) + try: + rbacapi.create_namespaced_role_binding(namespace, body) + except Exception, e: + logging.error("%s", str(e)) # create service account permissions body = { "apiVersion": "rbac.authorization.k8s.io/v1beta1", @@ -171,7 +174,10 @@ def create_user_RBAC_permissions(username): "namespace": namespace }] } - rbacapi.create_namespaced_role_binding(namespace, body) + try: + rbacapi.create_namespaced_role_binding(namespace, body) + except Exception, e: + logging.error("%s", str(e)) def create_user_namespace(username): v1api = kubernetes.client.CoreV1Api(utils.get_admin_api_client()) diff --git a/paddlecloud/paddlecloud/settings.py b/paddlecloud/paddlecloud/settings.py index 9825f4cc..dcaee078 100644 --- a/paddlecloud/paddlecloud/settings.py +++ b/paddlecloud/paddlecloud/settings.py @@ -278,7 +278,7 @@ FSTYPE_CEPHFS = "cephfs" FSTYPE_HOSTPATH = "hostpath" DATACENTERS = { - "datacenter1":{ + "meiyan":{ "fstype": FSTYPE_CEPHFS, "monitors_addr": ["172.19.32.166:6789"], # must be a list "secret": "ceph-secret", diff --git a/paddlecloud/paddlecloud/urls.py b/paddlecloud/paddlecloud/urls.py index 6679118a..746dd451 100644 --- a/paddlecloud/paddlecloud/urls.py +++ b/paddlecloud/paddlecloud/urls.py @@ -31,6 +31,7 @@ url(r"^api/v1/logs/", paddlejob.views.LogsView.as_view()), url(r"^api/v1/workers/", paddlejob.views.WorkersView.as_view()), url(r"^api/v1/quota/", paddlejob.views.QuotaView.as_view()), + url(r"^api/v1/file/", paddlejob.views.SimpleFileView.as_view()), ] urlpatterns += static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT) diff --git a/paddlecloud/paddlejob/utils.py b/paddlecloud/paddlejob/utils.py index 5feb8d96..45bcba8f 100644 --- a/paddlecloud/paddlejob/utils.py +++ b/paddlecloud/paddlejob/utils.py @@ -4,6 +4,7 @@ from rest_framework import viewsets, generics, permissions from rest_framework.response import Response from rest_framework.views import APIView +import logging first_cap_re = re.compile('(.)([A-Z][a-z]+)') all_cap_re = re.compile('([a-z0-9])([A-Z])') def simple_response(code, msg): @@ -12,6 +13,10 @@ def simple_response(code, msg): "msg": msg }) +def error_message_response(msg): + logging.error("error: %s", msg) + return Response({"msg": msg}) + def convert_camel2snake(data): s1 = first_cap_re.sub(r'\1_\2', name) return all_cap_re.sub(r'\1_\2', s1).lower() diff --git a/paddlecloud/paddlejob/views.py b/paddlecloud/paddlejob/views.py index 8cc5f8f3..3c734f71 100644 --- a/paddlecloud/paddlejob/views.py +++ b/paddlecloud/paddlejob/views.py @@ -8,11 +8,13 @@ from rest_framework import viewsets, generics, permissions from rest_framework.response import Response from rest_framework.views import APIView +from rest_framework.parsers import MultiPartParser, FormParser, FileUploadParser import json import utils import notebook.utils import logging import volume +import os class JobsView(APIView): permission_classes = (permissions.IsAuthenticated,) @@ -89,6 +91,16 @@ def post(self, request, format=None): # get user specified image job_image = obj.get("image", None) gpu_count = obj.get("gpu", 0) + # jobPackage validation: startwith /pfs + # NOTE: job packages are uploaded to /pfs/[dc]/home/[user]/jobs/[jobname] + job_name = obj.get("name", "paddle-cluster-job") + package_in_pod = os.path.join("/pfs/%s/home/%s"%(dc, username), "jobs", job_name) + + # package must be ready before submit a job + current_package_path = package_in_pod.replace("/pfs/%s/home"%dc, settings.STORAGE_PATH) + if not os.path.exists(current_package_path): + return utils.error_message_response("error: package not exist in cloud") + # use default images if not job_image : if gpu_count > 0: @@ -106,8 +118,8 @@ def post(self, request, format=None): )) paddle_job = PaddleJob( - name = obj.get("name", "paddle-cluster-job"), - job_package = obj.get("jobPackage", ""), + name = job_name, + job_package = package_in_pod, parallelism = obj.get("parallelism", 1), cpu = obj.get("cpu", 1), memory = obj.get("memory", "1Gi"), @@ -140,7 +152,7 @@ def post(self, request, format=None): except ApiException, e: logging.error("error submitting trainer job: %s" % e) return utils.simple_response(500, str(e)) - return utils.simple_response(200, "OK") + return utils.simple_response(200, "") def delete(self, request, format=None): """ @@ -276,3 +288,70 @@ def get(self, request, format=None): quota_list = api_client.CoreV1Api(api_cilent=api_client)\ .list_namespaced_resource_quota(namespace) return Response(quota_list.to_dict()) + + +class SimpleFileView(APIView): + permission_classes = (permissions.IsAuthenticated,) + parser_classes = (FormParser, MultiPartParser,) + + def __validate_path(self, request, file_path): + """ + returns error_msg. error_msg will be empty if there's no error. + """ + path_parts = file_path.split(os.path.sep) + + assert(path_parts[1]=="pfs") + assert(path_parts[2] in settings.DATACENTERS.keys()) + assert(path_parts[3] == "home") + assert(path_parts[4] == request.user.username) + + server_file = os.path.join(settings.STORAGE_PATH, request.user.username, *path_parts[5:]) + + return server_file + + def get(self, request, format=None): + """ + Simple get file. + """ + file_path = request.query_params.get("path") + try: + write_file = self.__validate_path(request, file_path) + except Exception, e: + return utils.error_message_response("file path not valid: %s"%str(e)) + + if not os.path.exists(os.sep+write_file): + return Response({"msg": "file not exist"}) + + response = HttpResponse(open(write_file), content_type='application/force-download') + response['Content-Disposition'] = 'attachment; filename="%s"' % os.path.basename(write_file) + + return response + + def post(self, request, format=None): + """ + Simple put file. + """ + file_obj = request.data['file'] + file_path = request.query_params.get("path") + if not file_path: + return utils.error_message_response("must specify path") + try: + write_file = self.__validate_path(request, file_path) + except Exception, e: + return utils.error_message_response("file path not valid: %s"%str(e)) + + if not os.path.exists(os.path.dirname(write_file)): + try: + os.makedirs(os.path.dirname(write_file)) + except OSError as exc: # Guard against race condition + if exc.errno != errno.EEXIST: + raise + # FIXME: always overwrite package files + with open(write_file, "w") as fn: + while True: + data = file_obj.read(4096) + if not data: + break + fn.write(data) + + return Response({"msg": ""})