Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Submit job with upload user's package #128

Merged
merged 8 commits into from
Jun 7, 2017
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/cmd/paddlecloud/paddlecloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func main() {
subcommands.Register(&paddlecloud.LogsCommand{}, "")
subcommands.Register(&paddlecloud.GetCommand{}, "")
subcommands.Register(&paddlecloud.KillCommand{}, "")
subcommands.Register(&paddlecloud.SimpleFileCmd{}, "")

flag.Parse()
ctx := context.Background()
Expand Down
8 changes: 4 additions & 4 deletions go/paddlecloud/restclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ func makeRequest(uri string, method string, body io.Reader,
for k, v := range authHeader {
req.Header.Set(k, v)
}

if query != nil {
req.URL.RawQuery = query.Encode()
}
Expand Down Expand Up @@ -97,19 +96,20 @@ func DeleteCall(targetURL string, jsonString []byte) ([]byte, error) {
}

// PostFile make a POST call to HTTP server to upload a file.
func PostFile(targetURL string, filename string) ([]byte, error) {
func PostFile(targetURL string, filename string, query url.Values) ([]byte, error) {
bodyBuf := &bytes.Buffer{}
bodyWriter := multipart.NewWriter(bodyBuf)

// this step is very important
fileWriter, err := bodyWriter.CreateFormFile("uploadfile", filename)
fileWriter, err := bodyWriter.CreateFormFile("file", filename)
if err != nil {
fmt.Fprintf(os.Stderr, "error writing to buffer: %v\n", err)
return []byte{}, err
}

// open file handle
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment maybe not useful.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

fh, err := os.Open(filename)
defer fh.Close()
if err != nil {
fmt.Fprintf(os.Stderr, "error opening file: %v\n", err)
return []byte{}, err
Expand All @@ -124,7 +124,7 @@ func PostFile(targetURL string, filename string) ([]byte, error) {
contentType := bodyWriter.FormDataContentType()
bodyWriter.Close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check the return of Close().

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


req, err := makeRequestToken(targetURL, "POST", bodyBuf, contentType, nil)
req, err := makeRequestToken(targetURL, "POST", bodyBuf, contentType, query)
if err != nil {
return []byte{}, err
}
Expand Down
127 changes: 127 additions & 0 deletions go/paddlecloud/simplefile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package paddlecloud

import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"net/url"
"os"
"path"

"github.com/google/subcommands"
)

// SimpleFileCmd define the subcommand of simple file operations
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment should be a sentence, which ends with ".".

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

type SimpleFileCmd struct {
}

// Name is subcommands name
func (*SimpleFileCmd) Name() string { return "file" }

// Synopsis is subcommands synopsis
func (*SimpleFileCmd) Synopsis() string { return "Simple file operations." }

// Usage is subcommands Usage
func (*SimpleFileCmd) Usage() string {
return `file [put|get] <src> <dst>:
Options:
`
}

// SetFlags registers subcommands flags
func (p *SimpleFileCmd) SetFlags(f *flag.FlagSet) {
}

// Execute file ops
func (p *SimpleFileCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus {
if f.NArg() < 1 || f.NArg() > 3 {
f.Usage()
return subcommands.ExitFailure
}
switch f.Arg(0) {
case "put":
err := putFile(f.Arg(1), f.Arg(2))
if err != nil {
fmt.Fprintf(os.Stderr, "put file error: %s\n", err)
return subcommands.ExitFailure
}
case "get":
err := getFile(f.Arg(1), f.Arg(2))
if err != nil {
fmt.Fprintf(os.Stderr, "get file error: %s\n", err)
return subcommands.ExitFailure
}
default:
f.Usage()
return subcommands.ExitFailure
}
return subcommands.ExitSuccess
}

func putFile(src string, dest string) error {
query := url.Values{}
_, srcFile := path.Split(src)
destDir, destFile := path.Split(dest)
var destFullPath string
if len(destFile) == 0 {
destFullPath = path.Join(destDir, srcFile)
} else {
destFullPath = dest
}
query.Set("path", destFullPath)
respStr, err := PostFile(config.ActiveConfig.Endpoint+"/api/v1/file/", src, query)
if err != nil {
return err
}
var respObj interface{}
if err = json.Unmarshal(respStr, &respObj); err != nil {
return err
}
// FIXME: Print an error if error message is not empty. Use response code instead
Copy link
Collaborator

@gongweibao gongweibao Jun 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is response code? Is it a TODO?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a enhancement to uniform the response json format, see this issue: #129

errMsg := respObj.(map[string]interface{})["msg"].(string)
if len(errMsg) > 0 {
fmt.Fprintf(os.Stderr, "upload file error: %s\n", errMsg)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return an error?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return error will break the filepath.Walk call. Will fix this after the above issue is done.

}
return nil
}

func getFile(src string, dest string) error {
query := url.Values{}
query.Set("path", src)
req, err := makeRequestToken(config.ActiveConfig.Endpoint+"/api/v1/file/", "GET", nil, "", query)
if err != nil {
return err
}
resp, err := httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.Status != HTTPOK {
return errors.New("server error: " + resp.Status)
}
_, srcFile := path.Split(src)
destDir, destFile := path.Split(dest)
var destFullPath string
if len(destFile) == 0 {
destFullPath = path.Join(destDir, srcFile)
} else {
destFullPath = dest
}
if _, err = os.Stat(destFullPath); err == nil {
return errors.New("file already exist: " + destFullPath)
}
out, err := os.Create(destFullPath)
if err != nil {
return err
}
defer out.Close()
_, err = io.Copy(out, resp.Body)
if err != nil {
return err
}
return nil
}
34 changes: 27 additions & 7 deletions go/paddlecloud/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package paddlecloud
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"os"
"path"
"path/filepath"

"github.com/golang/glog"
Expand Down Expand Up @@ -53,7 +55,7 @@ func (p *SubmitCmd) SetFlags(f *flag.FlagSet) {
f.IntVar(&p.Pservers, "pservers", 0, "Number of parameter servers. Defaults equal to -p")
f.IntVar(&p.PSCPU, "pscpu", 1, "Parameter server CPU resource. Defaults to 1.")
f.StringVar(&p.PSMemory, "psmemory", "1Gi", "Parameter server momory resource. Defaults to 1Gi.")
f.StringVar(&p.Entry, "entry", "paddle train", "Command of starting trainer process. Defaults to paddle train")
f.StringVar(&p.Entry, "entry", "", "Command of starting trainer process. Defaults to paddle train")
f.StringVar(&p.Topology, "topology", "", "Will Be Deprecated .py file contains paddle v1 job configs")
f.IntVar(&p.Passes, "passes", 1, "Pass count for training job")
}
Expand Down Expand Up @@ -95,18 +97,36 @@ func NewSubmitter(cmd *SubmitCmd) *Submitter {
// Submit current job
func (s *Submitter) Submit(jobPackage string) error {
// 1. upload user job package to pfs
filepath.Walk(jobPackage, func(path string, info os.FileInfo, err error) error {
glog.V(10).Infof("Uploading %s...\n", path)
return nil
//return postFile(path, config.activeConfig.endpoint+"/api/v1/files")
err := filepath.Walk(jobPackage, func(filePath string, info os.FileInfo, err error) error {
if info.IsDir() {
return nil
}
glog.V(10).Infof("Uploading %s...\n", filePath)
dest := "/" + path.Join("pfs", config.ActiveConfig.Name, "home", config.ActiveConfig.Username, filePath)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dest := "/" + path.Join("pfs", config.ActiveConfig.Name, ...
==>
dest := path.Join("/pfs", config.ActiveConfig.Name, ...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. Done.

fmt.Printf("uploading: %s...\n", filePath)
return putFile(filePath, dest)
})
if err != nil {
return err
}
// 2. call paddlecloud server to create kubernetes job
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is not useful.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talk offline. OK!

jsonString, err := json.Marshal(s.args)
if err != nil {
return err
}
glog.V(10).Infof("Submitting job: %s to %s\n", jsonString, config.ActiveConfig.Endpoint+"/api/v1/jobs")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to specify /api/v1/jobs to be constant?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

REST APIs are currently all v1, change this when we have v2

respBody, err := PostCall(config.ActiveConfig.Endpoint+"/api/v1/jobs/", jsonString)
glog.V(10).Infof("got return body size: %d", len(respBody))
return err
if err != nil {
return err
}
var respObj interface{}
if err = json.Unmarshal(respBody, &respObj); err != nil {
return err
}
// FIXME: Return an error if error message is not empty. Use response code instead
errMsg := respObj.(map[string]interface{})["msg"].(string)
if len(errMsg) > 0 {
return errors.New(errMsg)
}
return nil
}
1 change: 1 addition & 0 deletions paddlecloud/paddlecloud/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
url(r"^api/v1/logs/", paddlejob.views.LogsView.as_view()),
url(r"^api/v1/workers/", paddlejob.views.WorkersView.as_view()),
url(r"^api/v1/quota/", paddlejob.views.QuotaView.as_view()),
url(r"^api/v1/file/", paddlejob.views.SimpleFileView.as_view()),
]

urlpatterns += static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)
Expand Down
5 changes: 5 additions & 0 deletions paddlecloud/paddlejob/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from rest_framework import viewsets, generics, permissions
from rest_framework.response import Response
from rest_framework.views import APIView
import logging
first_cap_re = re.compile('(.)([A-Z][a-z]+)')
all_cap_re = re.compile('([a-z0-9])([A-Z])')
def simple_response(code, msg):
Expand All @@ -12,6 +13,10 @@ def simple_response(code, msg):
"msg": msg
})

def error_message_response(msg):
logging.error("error: %s", msg)
return Response({"msg": msg})

def convert_camel2snake(data):
s1 = first_cap_re.sub(r'\1_\2', name)
return all_cap_re.sub(r'\1_\2', s1).lower()
90 changes: 88 additions & 2 deletions paddlecloud/paddlejob/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
from rest_framework import viewsets, generics, permissions
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework.parsers import MultiPartParser, FormParser, FileUploadParser
import json
import utils
import notebook.utils
import logging
import volume
import os

class JobsView(APIView):
permission_classes = (permissions.IsAuthenticated,)
Expand Down Expand Up @@ -89,6 +91,23 @@ def post(self, request, format=None):
# get user specified image
job_image = obj.get("image", None)
gpu_count = obj.get("gpu", 0)
# jobPackage validation: startwith /pfs
# NOTE: always overwrite the job package when the directory exists
job_package =obj.get("jobPackage", "")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this logic, users will use the parameter jobPackage as /pfs/xxx/home/yyy or /xxxx or a relative path, there is so much format for the jobPackage, it looks so much confusing, maybe we can only support one format, such as "/pfs/$dc/home/$username", I think it's more clearly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a brief discussion, we decide to use "jobname" as default path to store the package content. So that users can find history package and program file folders if they want to find out something.

if not job_package.startswith("/pfs"):
# add /pfs... cloud path
if job_package.startswith("/"):
# use last dirname as package name
package_in_pod = os.path.join("/pfs/%s/home/%s"%(dc, username), os.path.basename(job_package))
else:
package_in_pod = os.path.join("/pfs/%s/home/%s"%(dc, username), job_package)
else:
package_in_pod = job_package
# package must be ready before submit a job
current_package_path = package_in_pod.replace("/pfs/%s/home"%dc, settings.STORAGE_PATH)
if not os.path.exists(current_package_path):
return utils.error_message_response("error: package not exist in cloud")

# use default images
if not job_image :
if gpu_count > 0:
Expand All @@ -107,7 +126,7 @@ def post(self, request, format=None):

paddle_job = PaddleJob(
name = obj.get("name", "paddle-cluster-job"),
job_package = obj.get("jobPackage", ""),
job_package = package_in_pod,
parallelism = obj.get("parallelism", 1),
cpu = obj.get("cpu", 1),
memory = obj.get("memory", "1Gi"),
Expand Down Expand Up @@ -140,7 +159,7 @@ def post(self, request, format=None):
except ApiException, e:
logging.error("error submitting trainer job: %s" % e)
return utils.simple_response(500, str(e))
return utils.simple_response(200, "OK")
return utils.simple_response(200, "")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this change is not necessary.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed paddlecloud client to check if msg is not empty, means there is some error.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I got it and make a mark for using HTTP status instead of check msg.


def delete(self, request, format=None):
"""
Expand Down Expand Up @@ -276,3 +295,70 @@ def get(self, request, format=None):
quota_list = api_client.CoreV1Api(api_cilent=api_client)\
.list_namespaced_resource_quota(namespace)
return Response(quota_list.to_dict())


class SimpleFileView(APIView):
permission_classes = (permissions.IsAuthenticated,)
parser_classes = (FormParser, MultiPartParser,)

def __validate_path(self, request, file_path):
"""
returns error_msg. error_msg will be empty if there's no error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A sentence should end with '.'.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

"""
path_parts = file_path.split(os.path.sep)

assert(path_parts[1]=="pfs")
assert(path_parts[2] in settings.DATACENTERS.keys())
assert(path_parts[3] == "home")
assert(path_parts[4] == request.user.username)

server_file = os.path.join(settings.STORAGE_PATH, request.user.username, *path_parts[5:])

return server_file

def get(self, request, format=None):
"""
Simple down file
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

down=>download

Copy link
Collaborator Author

@typhoonzero typhoonzero Jun 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to get and put

"""
file_path = request.query_params.get("path")
try:
write_file = self.__validate_path(request, file_path)
except Exception, e:
return utils.error_message_response("file path not valid: %s"%str(e))

if not os.path.exists(os.sep+write_file):
return Response({"msg": "file not exist"})

response = HttpResponse(open(write_file), content_type='application/force-download')
response['Content-Disposition'] = 'attachment; filename="%s"' % os.path.basename(write_file)

return response

def post(self, request, format=None):
"""
Simple up file
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

up=>upload

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to get and put

"""
file_obj = request.data['file']
file_path = request.query_params.get("path")
if not file_path:
return utils.error_message_response("must specify path")
try:
write_file = self.__validate_path(request, file_path)
except Exception, e:
return utils.error_message_response("file path not valid: %s"%str(e))

if not os.path.exists(os.path.dirname(write_file)):
try:
os.makedirs(os.path.dirname(write_file))
except OSError as exc: # Guard against race condition
if exc.errno != errno.EEXIST:
raise
# FIXME: always overwrite package files
with open(write_file, "w") as fn:
while True:
data = file_obj.read(4096)
if not data:
break
fn.write(data)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check errors may occur.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Django will catch exceptions and return a default response.


return Response({"msg": ""})