diff --git a/go/cmd/paddlecloud/paddlecloud.go b/go/cmd/paddlecloud/paddlecloud.go index 14a06426..be4bf264 100644 --- a/go/cmd/paddlecloud/paddlecloud.go +++ b/go/cmd/paddlecloud/paddlecloud.go @@ -16,6 +16,10 @@ func main() { subcommands.Register(&paddlecloud.LogsCommand{}, "") subcommands.Register(&paddlecloud.GetCommand{}, "") subcommands.Register(&paddlecloud.KillCommand{}, "") + subcommands.Register(&paddlecloud.LsCommand{}, "") + subcommands.Register(&paddlecloud.CpCommand{}, "") + subcommands.Register(&paddlecloud.RmCommand{}, "") + subcommands.Register(&paddlecloud.MkdirCommand{}, "") flag.Parse() ctx := context.Background() diff --git a/go/cmd/pfsserver/main.go b/go/cmd/pfsserver/main.go new file mode 100644 index 00000000..71445b6e --- /dev/null +++ b/go/cmd/pfsserver/main.go @@ -0,0 +1,24 @@ +package main + +import ( + "flag" + "fmt" + "net/http" + + "github.com/PaddlePaddle/cloud/go/filemanager/pfsserver" + log "github.com/golang/glog" +) + +func main() { + port := flag.Int("port", 8080, "port of server") + ip := flag.String("ip", "0.0.0.0", "ip of server") + tokenUri := flag.String("tokenuri", "http://cloud.paddlepaddle.org", "uri of token server") + flag.Parse() + + router := pfsserver.NewRouter() + addr := fmt.Sprintf("%s:%d", *ip, *port) + pfsserver.TokenUri = *tokenUri + + log.Infof("server on:%s and tokenuri:%s\n", addr, *tokenUri) + log.Fatal(http.ListenAndServe(addr, router)) +} diff --git a/go/filemanager/pfsmodules/chunk.go b/go/filemanager/pfsmodules/chunk.go new file mode 100644 index 00000000..164a5ad9 --- /dev/null +++ b/go/filemanager/pfsmodules/chunk.go @@ -0,0 +1,98 @@ +package pfsmodules + +import ( + "errors" + "fmt" + "io" + "net/url" + "os" + "strconv" + + log "github.com/golang/glog" +) + +// Chunk respresents a chunk info. +type Chunk struct { + Path string + Offset int64 + Size int64 +} + +// ToURLParam encodes variables to url encoding parameters. +func (p *Chunk) ToURLParam() url.Values { + parameters := url.Values{} + parameters.Add("path", p.Path) + + str := fmt.Sprint(p.Offset) + parameters.Add("offset", str) + + str = fmt.Sprint(p.Size) + parameters.Add("chunksize", str) + + return parameters +} + +// ParseChunk get a Chunk struct from path. +// path example: +// path=/pfs/datacenter1/1.txt&offset=4096&chunksize=4096 +func ParseChunk(path string) (*Chunk, error) { + cmd := Chunk{} + + m, err := url.ParseQuery(path) + if err != nil || + len(m["path"]) == 0 || + len(m["offset"]) == 0 || + len(m["chunksize"]) == 0 { + return nil, errors.New(StatusJSONErr) + } + + cmd.Path = m["path"][0] + cmd.Offset, err = strconv.ParseInt(m["offset"][0], 10, 64) + if err != nil { + return nil, errors.New(StatusJSONErr) + } + + chunkSize, err := strconv.ParseInt(m["chunksize"][0], 10, 64) + if err != nil { + return nil, errors.New(StatusBadChunkSize) + } + cmd.Size = chunkSize + + return &cmd, nil +} + +// LoadChunkData loads a specified chunk to io.Writer. +func (p *Chunk) LoadChunkData(w io.Writer) error { + f, err := os.Open(p.Path) + if err != nil { + return err + } + defer Close(f) + + _, err = f.Seek(p.Offset, 0) + if err != nil { + return err + } + + loaded, err := io.CopyN(w, f, p.Size) + log.V(2).Infof("loaded:%d\n", loaded) + return err +} + +// SaveChunkData save data from io.Reader. +func (p *Chunk) SaveChunkData(r io.Reader) error { + f, err := os.OpenFile(p.Path, os.O_WRONLY, 0600) + if err != nil { + return err + } + defer Close(f) + + _, err = f.Seek(p.Offset, 0) + if err != nil { + return err + } + + writen, err := io.CopyN(f, r, p.Size) + log.V(2).Infof("chunksize:%d writen:%d\n", p.Size, writen) + return err +} diff --git a/go/filemanager/pfsmodules/chunkmeta.go b/go/filemanager/pfsmodules/chunkmeta.go new file mode 100644 index 00000000..1df294e1 --- /dev/null +++ b/go/filemanager/pfsmodules/chunkmeta.go @@ -0,0 +1,203 @@ +package pfsmodules + +import ( + "crypto/md5" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "os" + "sort" + "strconv" +) + +const ( + defaultMaxChunkSize = 4 * 1024 * 1024 + defaultMinChunkSize = 4 * 1024 +) +const ( + ChunkMetaCmdName = "GetChunkMeta" +) + +// ChunkMeta holds the chunk meta's info. +type ChunkMeta struct { + Offset int64 `json:"offset"` + Checksum string `json:"checksum"` + Len int64 `json:"len"` +} + +// ChunkMetaCmd is a command. +type ChunkMetaCmd struct { + Method string `json:"method"` + FilePath string `json:"path"` + ChunkSize int64 `json:"chunksize"` +} + +// ToURLParam encodes ChunkMetaCmd to URL encoding string. +func (p *ChunkMetaCmd) ToURLParam() url.Values { + parameters := url.Values{} + parameters.Add("method", p.Method) + parameters.Add("path", p.FilePath) + + str := fmt.Sprint(p.ChunkSize) + parameters.Add("chunksize", str) + + return parameters +} + +// ToJSON encodes ChunkMetaCmd to JSON string. +func (p *ChunkMetaCmd) ToJSON() ([]byte, error) { + return json.Marshal(p) +} + +// Run is a functions which run ChunkMetaCmd. +func (p *ChunkMetaCmd) Run() (interface{}, error) { + return GetChunkMeta(p.FilePath, p.ChunkSize) +} + +func (p *ChunkMetaCmd) checkChunkSize() error { + if p.ChunkSize < defaultMinChunkSize || + p.ChunkSize > defaultMaxChunkSize { + return errors.New(StatusBadChunkSize) + } + + return nil +} + +// CloudCheck checks the conditions when running on cloud. +func (p *ChunkMetaCmd) ValidateCloudArgs(userName string) error { + if err := ValidatePfsPath([]string{p.FilePath}, userName); err != nil { + return err + } + + return p.checkChunkSize() +} + +// LocalCheck checks the conditions when running locally. +func (p *ChunkMetaCmd) ValidateLocalArgs() error { + return p.checkChunkSize() +} + +// NewChunkMetaCmdFromURLParams get a new ChunkMetaCmd. +func NewChunkMetaCmdFromURLParam(r *http.Request) (*ChunkMetaCmd, error) { + method := r.URL.Query().Get("method") + path := r.URL.Query().Get("path") + chunkStr := r.URL.Query().Get("chunksize") + + if len(method) == 0 || + method != ChunkMetaCmdName || + len(path) == 0 || + len(chunkStr) == 0 { + return nil, errors.New(http.StatusText(http.StatusBadRequest)) + } + + chunkSize, err := strconv.ParseInt(chunkStr, 10, 64) + if err != nil { + return nil, errors.New(StatusBadChunkSize) + } + + return &ChunkMetaCmd{ + Method: method, + FilePath: path, + ChunkSize: chunkSize, + }, nil +} + +type metaSlice []ChunkMeta + +func (a metaSlice) Len() int { return len(a) } +func (a metaSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a metaSlice) Less(i, j int) bool { return a[i].Offset < a[j].Offset } + +// GetDiffChunkMeta gets difference between srcMeta and dstMeta. +func GetDiffChunkMeta(srcMeta []ChunkMeta, dstMeta []ChunkMeta) ([]ChunkMeta, error) { + if len(dstMeta) == 0 || len(srcMeta) == 0 { + return srcMeta, nil + } + + if !sort.IsSorted(metaSlice(srcMeta)) { + sort.Sort(metaSlice(srcMeta)) + } + + if !sort.IsSorted(metaSlice(dstMeta)) { + sort.Sort(metaSlice(dstMeta)) + } + + dstIdx := 0 + srcIdx := 0 + diff := make([]ChunkMeta, 0, len(srcMeta)) + + for { + if srcMeta[srcIdx].Offset < dstMeta[dstIdx].Offset { + diff = append(diff, srcMeta[srcIdx]) + srcIdx++ + } else if srcMeta[srcIdx].Offset > dstMeta[dstIdx].Offset { + dstIdx++ + } else { + if srcMeta[srcIdx].Checksum != dstMeta[dstIdx].Checksum { + diff = append(diff, srcMeta[srcIdx]) + } + + dstIdx++ + srcIdx++ + } + + if dstIdx >= len(dstMeta) { + break + } + + if srcIdx >= len(srcMeta) { + break + } + } + + if srcIdx < len(srcMeta) { + diff = append(diff, srcMeta[srcIdx:]...) + } + + return diff, nil +} + +// GetChunkMeta gets chunk metas from path of file. +func GetChunkMeta(path string, len int64) ([]ChunkMeta, error) { + f, err := os.Open(path) + if err != nil { + return nil, err + } + defer Close(f) + + if len > defaultMaxChunkSize || len < defaultMinChunkSize { + return nil, errors.New(StatusBadChunkSize) + } + + var metas []ChunkMeta + + data := make([]byte, len) + offset := int64(0) + + for { + n, err := f.Read(data) + if err != nil && err != io.EOF { + return nil, err + } + + if err == io.EOF { + break + } + + m := ChunkMeta{} + m.Offset = offset + sum := md5.Sum(data[:n]) + m.Checksum = hex.EncodeToString(sum[:]) + m.Len = int64(n) + + metas = append(metas, m) + + offset += int64(n) + } + + return metas, nil +} diff --git a/go/filemanager/pfsmodules/command.go b/go/filemanager/pfsmodules/command.go new file mode 100644 index 00000000..a7a07a0d --- /dev/null +++ b/go/filemanager/pfsmodules/command.go @@ -0,0 +1,76 @@ +package pfsmodules + +import ( + "errors" + "io" + "net/url" + "strings" + + log "github.com/golang/glog" +) + +const ( + // DefaultMultiPartBoundary is the default multipart form boudary. + DefaultMultiPartBoundary = "8d7b0e5709d756e21e971ff4d9ac3b20" + + // MaxJSONRequestSize is the max body size when server receives a request. + MaxJSONRequestSize = 2048 +) + +// Command is a interface of all commands. +type Command interface { + // ToURLParam generates url.Values of the command struct. + ToURLParam() url.Values + // ToJSON generates JSON string of the command struct. + ToJSON() ([]byte, error) + // Run runs a command. + Run() (interface{}, error) + // ValidateLocalArgs validates arguments when running locally. + ValidateLocalArgs() error + // ValidateCloudArgs validates arguments when running on cloud. + ValidateCloudArgs(userName string) error +} + +// CheckUser checks if a user has authority to access a path. +func checkUser(path string, user string) error { + a := strings.Split(path, "/") + if len(a) < 3 { + return errors.New(StatusBadPath) + } + + if a[3] != user { + return errors.New(StatusUnAuthorized) + } + return nil +} + +// IsCloudPath returns whether a path is a pfspath. +func ValidatePfsPath(paths []string, userName string) error { + if len(paths) == 0 { + return errors.New(StatusNotEnoughArgs) + } + + for _, path := range paths { + if !strings.HasPrefix(path, "/pfs/") { + return errors.New(StatusShouldBePfsPath + ":" + path) + } + + if err := checkUser(path, userName); err != nil { + return errors.New(StatusShouldBePfsPath + ":" + path) + } + } + return nil +} + +// IsCloudPath returns whether a path is a pfspath. +func IsCloudPath(path string) bool { + return strings.HasPrefix(path, "/pfs/") +} + +// Close closes c and log it. +func Close(c io.Closer) { + err := c.Close() + if err != nil { + log.Error(err) + } +} diff --git a/go/filemanager/pfsmodules/cp.go b/go/filemanager/pfsmodules/cp.go new file mode 100644 index 00000000..06402e67 --- /dev/null +++ b/go/filemanager/pfsmodules/cp.go @@ -0,0 +1,69 @@ +package pfsmodules + +import ( + "flag" + "fmt" + "strconv" + + log "github.com/golang/glog" +) + +const ( + cpCmdName = "cp" +) + +// CpCmdResult means the copy-command's result. +type CpCmdResult struct { + Src string `json:"Path"` + Dst string `json:"Dst"` +} + +// CpCmd means copy-command. +type CpCmd struct { + Method string + V bool + Src []string + Dst string +} + +// NewCpCmdFromFlag returns a new CpCmd from parsed flags. +func NewCpCmdFromFlag(f *flag.FlagSet) (*CpCmd, error) { + cmd := CpCmd{} + + cmd.Method = cpCmdName + cmd.Src = make([]string, 0, f.NArg()) + + var err error + f.Visit(func(flag *flag.Flag) { + if flag.Name == "v" { + cmd.V, err = strconv.ParseBool(flag.Value.String()) + if err != nil { + log.Errorln("meets error when parsing argument v") + return + } + } + }) + + if err != nil { + return nil, err + } + + for i, arg := range f.Args() { + if i >= len(f.Args())-1 { + break + } + cmd.Src = append(cmd.Src, arg) + } + + cmd.Dst = f.Args()[len(f.Args())-1] + + return &cmd, nil +} + +// PartToString prints command's info. +func (p *CpCmd) PartToString(src, dst string) string { + if p.V { + return fmt.Sprintf("cp -v %s %s\n", src, dst) + } + return fmt.Sprintf("cp %s %s\n", src, dst) +} diff --git a/go/filemanager/pfsmodules/ls.go b/go/filemanager/pfsmodules/ls.go new file mode 100644 index 00000000..cc984b81 --- /dev/null +++ b/go/filemanager/pfsmodules/ls.go @@ -0,0 +1,194 @@ +package pfsmodules + +import ( + "errors" + "flag" + "net/http" + "net/url" + "os" + "path/filepath" + "strconv" + + log "github.com/golang/glog" +) + +const ( + lsCmdName = "ls" +) + +// LsResult represents a LsCmd's result. +type LsResult struct { + Path string `json:"Path"` + ModTime int64 `json:"ModTime"` + Size int64 `json:"Size"` + IsDir bool `json:"IsDir"` +} + +// LsCmd means LsCommand structure. +type LsCmd struct { + Method string + R bool + Args []string +} + +// ToURLParam encoding LsCmd to URL Encoding string. +func (p *LsCmd) ToURLParam() url.Values { + parameters := url.Values{} + parameters.Add("method", p.Method) + parameters.Add("r", strconv.FormatBool(p.R)) + + for _, arg := range p.Args { + parameters.Add("arg", arg) + } + + return parameters +} + +// ToJSON does't need to be implemented. +func (p *LsCmd) ToJSON() ([]byte, error) { + panic("not implemented") +} + +// NewLsCmdFromFlag returen a new LsCmd. +func NewLsCmdFromFlag(f *flag.FlagSet) (*LsCmd, error) { + cmd := LsCmd{} + + cmd.Method = lsCmdName + cmd.Args = make([]string, 0, f.NArg()) + + var err error + f.Visit(func(flag *flag.Flag) { + if flag.Name == "r" { + cmd.R, err = strconv.ParseBool(flag.Value.String()) + if err != nil { + log.Errorln("meets error when parsing argument r") + return + } + } + }) + + if err != nil { + return nil, err + } + + for _, arg := range f.Args() { + log.V(2).Info(arg) + cmd.Args = append(cmd.Args, arg) + } + + return &cmd, nil +} + +// NewLsCmdFromURLParam returns a new LsCmd according path variable. +func NewLsCmdFromURLParam(path string) (*LsCmd, error) { + cmd := LsCmd{} + + m, err := url.ParseQuery(path) + if err != nil { + return nil, err + } + + if len(m["method"]) == 0 || + len(m["r"]) == 0 || + len(m["arg"]) == 0 { + return nil, errors.New(StatusNotEnoughArgs) + } + + cmd.Method = m["method"][0] + if cmd.Method != lsCmdName { + return nil, errors.New(http.StatusText(http.StatusMethodNotAllowed) + ":" + cmd.Method) + } + + cmd.R, err = strconv.ParseBool(m["r"][0]) + if err != nil { + return nil, errors.New(StatusInvalidArgs + ":r") + } + + cmd.Args = m["arg"] + + return &cmd, nil +} + +// NewLsCmd return a new LsCmd according r and path variable. +func NewLsCmd(r bool, path string) *LsCmd { + return &LsCmd{ + Method: lsCmdName, + R: r, + Args: []string{path}, + } +} + +func lsPath(path string, r bool) ([]LsResult, error) { + var ret []LsResult + + err := filepath.Walk(path, func(subpath string, info os.FileInfo, err error) error { + + if err != nil { + return err + } + + m := LsResult{} + m.Path = subpath + m.Size = info.Size() + m.ModTime = info.ModTime().UnixNano() + m.IsDir = info.IsDir() + + if subpath == path { + if info.IsDir() { + } else { + ret = append(ret, m) + } + } else { + ret = append(ret, m) + } + + if info.IsDir() && !r && subpath != path { + return filepath.SkipDir + } + + return nil + }) + + return ret, err +} + +// CloudCheck checks the conditions when running on cloud. +func (p *LsCmd) ValidateCloudArgs(userName string) error { + return ValidatePfsPath(p.Args, userName) +} + +// LocalCheck checks the conditions when running local. +func (p *LsCmd) ValidateLocalArgs() error { + if len(p.Args) == 0 { + return errors.New(StatusNotEnoughArgs) + } + return nil +} + +// Run functions runs LsCmd and return LsResult and error if any happened. +func (p *LsCmd) Run() (interface{}, error) { + var results []LsResult + + for _, arg := range p.Args { + log.V(1).Infof("ls %s\n", arg) + + list, err := filepath.Glob(arg) + if err != nil { + return nil, err + } + + if len(list) == 0 { + return results, errors.New(StatusFileNotFound) + } + + for _, path := range list { + ret, err := lsPath(path, p.R) + if err != nil { + return results, err + } + results = append(results, ret...) + } + } + + return results, nil +} diff --git a/go/filemanager/pfsmodules/mkdir.go b/go/filemanager/pfsmodules/mkdir.go new file mode 100644 index 00000000..2c8d5557 --- /dev/null +++ b/go/filemanager/pfsmodules/mkdir.go @@ -0,0 +1,92 @@ +package pfsmodules + +import ( + "encoding/json" + "errors" + "flag" + "net/url" + "os" + + log "github.com/golang/glog" +) + +const ( + mkdirCmdName = "mkdir" +) + +// MkdirResult means Mkdir command's result. +type MkdirResult struct { + Path string `json:"path"` +} + +// MkdirCmd means Mkdir command. +type MkdirCmd struct { + Method string `json:"method"` + Args []string `json:"path"` +} + +// LocalCheck checks the conditions when running on local. +func (p *MkdirCmd) ValidateLocalArgs() error { + if len(p.Args) == 0 { + return errors.New(StatusNotEnoughArgs) + } + return nil +} + +// CloudCheck checks the conditions when running on cloud. +func (p *MkdirCmd) ValidateCloudArgs(userName string) error { + return ValidatePfsPath(p.Args, userName) +} + +// ToURLParam need not to be implemented. +func (p *MkdirCmd) ToURLParam() url.Values { + panic("not implemented") +} + +// ToJSON encodes MkdirCmd to JSON string. +func (p *MkdirCmd) ToJSON() ([]byte, error) { + return json.Marshal(p) +} + +// NewMkdirCmd returns a new MkdirCmd. +func NewMkdirCmd(path string) *MkdirCmd { + return &MkdirCmd{ + Method: mkdirCmdName, + Args: []string{path}, + } +} + +// NewMkdirCmdFromFlag returns a new MkdirCmd from parsed flags. +func NewMkdirCmdFromFlag(f *flag.FlagSet) (*MkdirCmd, error) { + cmd := MkdirCmd{} + + cmd.Method = mkdirCmdName + cmd.Args = make([]string, 0, f.NArg()) + + for _, arg := range f.Args() { + log.V(2).Info(arg) + cmd.Args = append(cmd.Args, arg) + } + + return &cmd, nil +} + +// Run runs MkdirCmd. +func (p *MkdirCmd) Run() (interface{}, error) { + var results []MkdirResult + for _, path := range p.Args { + fi, err := os.Stat(path) + + if os.IsExist(err) && !fi.IsDir() { + return results, errors.New(StatusAlreadyExist) + } + + if err := os.MkdirAll(path, 0700); err != nil { + return results, err + } + + results = append(results, MkdirResult{Path: path}) + } + + return results, nil +} diff --git a/go/filemanager/pfsmodules/rm.go b/go/filemanager/pfsmodules/rm.go new file mode 100644 index 00000000..d116e6b3 --- /dev/null +++ b/go/filemanager/pfsmodules/rm.go @@ -0,0 +1,122 @@ +package pfsmodules + +import ( + "encoding/json" + "errors" + "flag" + "net/url" + "os" + "path/filepath" + "strconv" + + log "github.com/golang/glog" +) + +const ( + rmCmdName = "rm" +) + +// RmResult means Rm-command's result. +type RmResult struct { + Path string `json:"path"` +} + +// RmCmd means Rm command. +type RmCmd struct { + Method string `json:"method"` + R bool `json:"r"` + Args []string `json:"path"` +} + +// LocalCheck checks the conditions when running local. +func (p *RmCmd) ValidateLocalArgs() error { + if len(p.Args) == 0 { + return errors.New(StatusInvalidArgs) + } + return nil +} + +// CloudCheck checks the conditions when running on cloud. +func (p *RmCmd) ValidateCloudArgs(userName string) error { + return ValidatePfsPath(p.Args, userName) +} + +// ToURLParam needs not to be implemented. +func (p *RmCmd) ToURLParam() url.Values { + panic("not implemented") +} + +// ToJSON encodes RmCmd to JSON string. +func (p *RmCmd) ToJSON() ([]byte, error) { + return json.Marshal(p) +} + +// NewRmCmd returns a new RmCmd. +func NewRmCmd(r bool, path string) *RmCmd { + return &RmCmd{ + Method: rmCmdName, + R: r, + Args: []string{path}, + } +} + +// NewRmCmdFromFlag returns a new RmCmd from parsed flags. +func NewRmCmdFromFlag(f *flag.FlagSet) (*RmCmd, error) { + cmd := RmCmd{} + + cmd.Method = rmCmdName + cmd.Args = make([]string, 0, f.NArg()) + + var err error + f.Visit(func(flag *flag.Flag) { + if flag.Name == "r" { + cmd.R, err = strconv.ParseBool(flag.Value.String()) + if err != nil { + log.Errorln("meets error when parsing argument r") + return + } + } + }) + + if err != nil { + return nil, err + } + + for _, arg := range f.Args() { + log.V(2).Info(arg) + cmd.Args = append(cmd.Args, arg) + } + + return &cmd, nil +} + +// Run runs RmCmd. +func (p *RmCmd) Run() (interface{}, error) { + var result []RmResult + + for _, path := range p.Args { + list, err := filepath.Glob(path) + if err != nil { + return result, err + } + + for _, arg := range list { + fi, err := os.Stat(arg) + if err != nil { + return result, err + } + + if fi.IsDir() && !p.R { + return result, errors.New(StatusCannotDelDirectory + ":" + arg) + } + + if err := os.RemoveAll(arg); err != nil { + return result, err + } + + result = append(result, RmResult{Path: arg}) + } + } + + return result, nil +} diff --git a/go/filemanager/pfsmodules/stat.go b/go/filemanager/pfsmodules/stat.go new file mode 100644 index 00000000..4e2c229b --- /dev/null +++ b/go/filemanager/pfsmodules/stat.go @@ -0,0 +1,78 @@ +package pfsmodules + +import ( + "errors" + "net/http" + "net/url" + "os" +) + +const ( + StatCmdName = "stat" +) + +// StatCmd means stat command. +type StatCmd struct { + Method string + Path string +} + +// ToURLParam encodes StatCmd to URL Encoding string. +func (p *StatCmd) ToURLParam() url.Values { + parameters := url.Values{} + parameters.Add("method", p.Method) + parameters.Add("path", p.Path) + + return parameters + +} + +// ToJSON here need not tobe implemented. +func (p *StatCmd) ToJSON() ([]byte, error) { + return nil, nil +} + +// NewStatCmdFromURLParam return a new StatCmd. +func NewStatCmdFromURLParam(path string) (*StatCmd, error) { + cmd := StatCmd{} + + m, err := url.ParseQuery(path) + if err != nil || + len(m["method"]) == 0 || + len(m["path"]) == 0 { + return nil, errors.New(StatusNotEnoughArgs) + } + + cmd.Method = m["method"][0] + if cmd.Method != StatCmdName { + return nil, errors.New(http.StatusText(http.StatusMethodNotAllowed) + ":" + cmd.Method) + } + + cmd.Path = m["path"][0] + return &cmd, nil +} + +// LocalCheck checks the condition when running local. +func (p *StatCmd) ValidateLocalArgs() error { + panic("not implement") +} + +// CloudCheck checks the conditions when running on cloud. +func (p *StatCmd) ValidateCloudArgs(userName string) error { + return ValidatePfsPath([]string{p.Path}, userName) +} + +// Run runs the StatCmd. +func (p *StatCmd) Run() (interface{}, error) { + fi, err := os.Stat(p.Path) + if err != nil { + return nil, err + } + + return &LsResult{ + Path: p.Path, + ModTime: fi.ModTime().UnixNano(), + IsDir: fi.IsDir(), + Size: fi.Size(), + }, nil +} diff --git a/go/filemanager/pfsmodules/status.go b/go/filemanager/pfsmodules/status.go new file mode 100644 index 00000000..e664b6b7 --- /dev/null +++ b/go/filemanager/pfsmodules/status.go @@ -0,0 +1,23 @@ +package pfsmodules + +// TODO +// need a custom error type? + +const ( + StatusFileNotFound = "no such file or directory" + StatusDirectoryNotAFile = "should be a file not a directory" + StatusCopyFromLocalToLocal = "don't support copy local to local" + StatusDestShouldBeDirectory = "dest should be a directory" + StatusOnlySupportFiles = "only support upload or download files not directories" + StatusBadFileSize = "bad file size" + StatusDirectoryAlreadyExist = "directory already exist" + StatusBadChunkSize = "chunksize error" + StatusShouldBePfsPath = "should be pfs path" + StatusNotEnoughArgs = "not enough arguments" + StatusInvalidArgs = "invalid arguments" + StatusUnAuthorized = "what you request is unauthorized" + StatusJSONErr = "parse json error" + StatusCannotDelDirectory = "can't del directory" + StatusAlreadyExist = "already exist" + StatusBadPath = "the path should be in format eg:/pf/datacentername/" +) diff --git a/go/filemanager/pfsmodules/touch.go b/go/filemanager/pfsmodules/touch.go new file mode 100644 index 00000000..9ea7da8b --- /dev/null +++ b/go/filemanager/pfsmodules/touch.go @@ -0,0 +1,139 @@ +package pfsmodules + +import ( + "encoding/json" + "errors" + "fmt" + "net/http" + "net/url" + "os" + "strconv" +) + +const ( + defaultMaxCreateFileSize = int64(4 * 1024 * 1024 * 1024) +) + +const ( + TouchCmdName = "touch" +) + +// TouchResult represents touch-command's result. +type TouchResult struct { + Path string `json:"path"` +} + +// TouchCmd is holds touch command's variables. +type TouchCmd struct { + Method string `json:"method"` + FileSize int64 `json:"filesize"` + Path string `json:"path"` +} + +func (p *TouchCmd) checkFileSize() error { + if p.FileSize < 0 || p.FileSize > defaultMaxCreateFileSize { + return errors.New(StatusBadFileSize + ":" + fmt.Sprint(p.FileSize)) + } + return nil +} + +// LocalCheck check the conditions when running local. +func (p *TouchCmd) ValidateLocalArgs() error { + return p.checkFileSize() +} + +// CloudCheck checks the conditions when running on cloud. +func (p *TouchCmd) ValidateCloudArgs(userName string) error { + if err := ValidatePfsPath([]string{p.Path}, userName); err != nil { + return err + } + + return p.checkFileSize() +} + +// ToURLParam encodes a TouchCmd to a URL encoding string. +func (p *TouchCmd) ToURLParam() url.Values { + parameters := url.Values{} + parameters.Add("method", p.Method) + parameters.Add("path", p.Path) + + str := fmt.Sprint(p.FileSize) + parameters.Add("path", str) + + return parameters +} + +// ToJSON encodes a TouchCmd to a JSON string. +func (p *TouchCmd) ToJSON() ([]byte, error) { + return json.Marshal(p) +} + +// NewTouchCmdFromURLParam return a new TouchCmd with specified path. +func NewTouchCmdFromURLParam(path string) (*TouchCmd, int32) { + cmd := TouchCmd{} + + m, err := url.ParseQuery(path) + if err != nil || + len(m["method"]) == 0 || + len(m["filesize"]) == 0 || + len(m["path"]) == 0 { + return nil, http.StatusBadRequest + } + + cmd.Method = m["method"][0] + if cmd.Method != TouchCmdName { + return nil, http.StatusBadRequest + } + + cmd.FileSize, err = strconv.ParseInt(m["filesize"][0], 0, 64) + if err != nil { + return nil, http.StatusBadRequest + } + + cmd.Path = m["path"][0] + + return &cmd, http.StatusOK +} + +// CreateSizedFile creates a file with specified size. +func CreateSizedFile(path string, size int64) error { + fd, err := os.Create(path) + if err != nil { + return err + } + defer Close(fd) + + if size <= 0 { + return nil + } + + _, err = fd.Seek(size-1, 0) + if err != nil { + return err + } + + _, err = fd.Write([]byte{0}) + return err +} + +// Run is a function runs TouchCmd. +func (p *TouchCmd) Run() (interface{}, error) { + if p.FileSize < 0 || p.FileSize > defaultMaxCreateFileSize { + return nil, errors.New(StatusBadFileSize) + } + + fi, err := os.Stat(p.Path) + if os.IsExist(err) && fi.IsDir() { + return nil, errors.New(StatusDirectoryAlreadyExist) + } + + if os.IsNotExist(err) || fi.Size() != p.FileSize { + if err := CreateSizedFile(p.Path, p.FileSize); err != nil { + return nil, err + } + } + + return &TouchResult{ + Path: p.Path, + }, nil +} diff --git a/go/filemanager/pfsserver/handler.go b/go/filemanager/pfsserver/handler.go new file mode 100644 index 00000000..baa827be --- /dev/null +++ b/go/filemanager/pfsserver/handler.go @@ -0,0 +1,428 @@ +package pfsserver + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "mime/multipart" + "net/http" + "net/url" + + pfsmod "github.com/PaddlePaddle/cloud/go/filemanager/pfsmodules" + sjson "github.com/bitly/go-simplejson" + log "github.com/golang/glog" +) + +type response struct { + Err string `json:"err"` + Results interface{} `json:"results"` +} + +func makeRequest(uri string, method string, body io.Reader, + contentType string, query url.Values, + authHeader map[string]string) (*http.Request, error) { + + if query != nil { + uri = fmt.Sprintf("%s?%s", uri, query.Encode()) + log.V(4).Infoln(uri) + } + + log.V(4).Infof("%s %s %T\n", method, uri, body) + req, err := http.NewRequest(method, uri, body) + if err != nil { + log.Errorf("new request %v\n", err) + return nil, err + } + + // default contentType is application/json. + if len(contentType) == 0 { + req.Header.Set("Content-Type", "application/json") + } else { + req.Header.Set("Content-Type", contentType) + } + + for k, v := range authHeader { + req.Header.Set(k, v) + } + + return req, nil +} + +func getResponse(req *http.Request) ([]byte, error) { + client := &http.Client{} + + resp, err := client.Do(req) + if err != nil { + log.Errorf("httpClient do error %v\n", err) + return []byte{}, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return []byte{}, errors.New("server error: " + resp.Status) + } + // FIXME: add more resp.Status checks. + return ioutil.ReadAll(resp.Body) +} + +var TokenUri = "http://cloud.paddlepaddle.org" + +func getUserName(uri string, token string) (string, error) { + authHeader := make(map[string]string) + authHeader["Authorization"] = "Token " + token + req, err := makeRequest(uri, "GET", nil, "", nil, authHeader) + if err != nil { + return "", err + } + + body, err := getResponse(req) + if err != nil { + return "", err + } + + log.V(4).Infoln("get token2user resp:" + string(body[:])) + var resp interface{} + if err := json.Unmarshal(body, &resp); err != nil { + return "", err + } + + user := resp.(map[string]interface{})["user"].(string) + if len(user) < 1 { + return "", errors.New("can't get username") + } + + return user, nil +} + +func cmdHandler(w http.ResponseWriter, req string, cmd pfsmod.Command, header http.Header) { + resp := response{} + + user, err := getUserName(TokenUri+"/api/v1/token2user/", header.Get("Authorization")) + if err != nil { + resp.Err = "get username error:" + err.Error() + writeJSONResponse(w, req, http.StatusOK, resp) + return + } + + if err := cmd.ValidateCloudArgs(user); err != nil { + resp.Err = err.Error() + writeJSONResponse(w, req, http.StatusOK, resp) + return + } + + result, err := cmd.Run() + if err != nil { + resp.Err = err.Error() + writeJSONResponse(w, req, http.StatusOK, resp) + return + } + + resp.Results = result + writeJSONResponse(w, req, http.StatusOK, resp) +} + +func lsHandler(w http.ResponseWriter, r *http.Request) { + cmd, err := pfsmod.NewLsCmdFromURLParam(r.URL.RawQuery) + + resp := response{} + if err != nil { + resp.Err = err.Error() + writeJSONResponse(w, r.URL.RawQuery, http.StatusOK, resp) + return + } + + cmdHandler(w, r.URL.RawQuery, cmd, r.Header) +} + +func statHandler(w http.ResponseWriter, r *http.Request) { + log.V(1).Info("begin stathandler") + cmd, err := pfsmod.NewStatCmdFromURLParam(r.URL.RawQuery) + + resp := response{} + if err != nil { + resp.Err = err.Error() + writeJSONResponse(w, r.URL.RawQuery, http.StatusOK, resp) + return + } + + cmdHandler(w, r.URL.RawQuery, cmd, r.Header) +} + +func writeJSONResponse(w http.ResponseWriter, + req string, + httpStatus int, + resp response) { + if httpStatus != http.StatusOK || len(resp.Err) > 0 { + log.Errorf("%s httpStatus:%d resp:=%v\n", + req, httpStatus, resp.Err) + } else { + log.Infof("%s httpStatus:%d\n", + req, httpStatus) + + log.V(1).Infof("%s httpStatus:%d resp:%#v\n", + req, httpStatus, resp) + } + + w.Header().Set("Content-Type", "application/json; charset=UTF-8") + w.WriteHeader(httpStatus) + + if err := json.NewEncoder(w).Encode(resp); err != nil { + log.Error(err) + } +} + +// GetFilesHandler processes files's GET request. +func GetFilesHandler(w http.ResponseWriter, r *http.Request) { + method := r.URL.Query().Get("method") + log.V(3).Infoln(r.URL.RawQuery) + + switch method { + case "ls": + lsHandler(w, r) + case "md5sum": + // TODO + // err := md5Handler(w, r) + case "stat": + statHandler(w, r) + default: + resp := response{} + writeJSONResponse(w, r.URL.RawQuery, + http.StatusMethodNotAllowed, resp) + } +} + +func rmHandler(w http.ResponseWriter, body []byte, header http.Header) { + log.V(1).Infof("begin proc rmHandler\n") + cmd := pfsmod.RmCmd{} + + resp := response{} + if err := json.Unmarshal(body, &cmd); err != nil { + writeJSONResponse(w, string(body[:]), http.StatusOK, resp) + return + } + + log.V(1).Infof("request :%#v\n", cmd) + + cmdHandler(w, string(body[:]), &cmd, header) + log.V(1).Infof("end proc handler\n") + +} + +func mkdirHandler(w http.ResponseWriter, body []byte, header http.Header) { + log.V(1).Infof("begin proc mkdir\n") + cmd := pfsmod.MkdirCmd{} + + resp := response{} + if err := json.Unmarshal(body, &cmd); err != nil { + writeJSONResponse(w, string(body[:]), http.StatusOK, resp) + return + } + + log.V(1).Infof("request :%#v\n", cmd) + + cmdHandler(w, string(body[:]), &cmd, header) + log.V(1).Infof("end proc mkdir\n") + +} + +func touchHandler(w http.ResponseWriter, body []byte, header http.Header) { + log.V(1).Infof("begin proc touch\n") + cmd := pfsmod.TouchCmd{} + + resp := response{} + if err := json.Unmarshal(body, &cmd); err != nil { + writeJSONResponse(w, string(body[:]), http.StatusOK, resp) + return + } + + log.V(1).Infof("request :%#v\n", cmd) + + cmdHandler(w, string(body[:]), &cmd, header) + log.V(1).Infof("end proc touch\n") +} + +func getBody(r *http.Request) ([]byte, error) { + body, err := ioutil.ReadAll(io.LimitReader(r.Body, pfsmod.MaxJSONRequestSize)) + if err != nil { + return nil, err + } + + if err := r.Body.Close(); err != nil { + return nil, err + } + + return body, nil +} + +func getMethod(body []byte) (string, error) { + o, err := sjson.NewJson(body) + if err != nil { + return "", errors.New(pfsmod.StatusJSONErr) + } + + j := o.Get("method") + if j == nil { + return "", errors.New(pfsmod.StatusJSONErr) + } + + method, _ := j.String() + if err != nil { + return "", errors.New(pfsmod.StatusJSONErr) + } + + return method, nil +} + +func modifyFilesHandler(w http.ResponseWriter, r *http.Request) { + resp := response{} + body, err := getBody(r) + log.V(3).Infof(string(body[:])) + if err != nil { + resp.Err = err.Error() + writeJSONResponse(w, string(body[:]), http.StatusOK, resp) + return + } + + method, err := getMethod(body) + if err != nil { + resp.Err = err.Error() + writeJSONResponse(w, string(body[:]), http.StatusOK, resp) + return + } + + switch method { + case "rm": + rmHandler(w, body, r.Header) + case "touch": + touchHandler(w, body, r.Header) + case "mkdir": + mkdirHandler(w, body, r.Header) + default: + resp := response{} + writeJSONResponse(w, string(body[:]), http.StatusMethodNotAllowed, resp) + } +} + +// DeleteFilesHandler processes files' DELETE request. +func DeleteFilesHandler(w http.ResponseWriter, r *http.Request) { + log.V(1).Infof("begin DeleteFilesHandler") + + modifyFilesHandler(w, r) + + log.V(1).Infof("end DeleteFilesHandler") +} + +// PostFilesHandler processes files' POST request. +func PostFilesHandler(w http.ResponseWriter, r *http.Request) { + log.V(1).Infof("begin PostFilesHandler") + + modifyFilesHandler(w, r) + + log.V(1).Infof("end PostFilesHandler") +} + +func getChunkMetaHandler(w http.ResponseWriter, r *http.Request) { + log.V(1).Infof("begin proc getChunkMeta\n") + cmd, err := pfsmod.NewChunkMetaCmdFromURLParam(r) + resp := response{} + + if err != nil { + resp.Err = err.Error() + writeJSONResponse(w, r.URL.RawQuery, http.StatusOK, resp) + return + } + + cmdHandler(w, r.URL.RawQuery, cmd, r.Header) + log.V(1).Infof("end proc getChunkMeta\n") +} + +// GetChunkMetaHandler processes GET ChunkMeta request. +func GetChunkMetaHandler(w http.ResponseWriter, r *http.Request) { + method := r.URL.Query().Get("method") + + switch method { + case "GetChunkMeta": + getChunkMetaHandler(w, r) + default: + writeJSONResponse(w, r.URL.RawQuery, http.StatusMethodNotAllowed, response{}) + } +} + +// GetChunkHandler processes GET Chunk request. +func GetChunkHandler(w http.ResponseWriter, r *http.Request) { + log.V(1).Infof("begin proc GetChunkHandler") + + chunk, err := pfsmod.ParseChunk(r.URL.RawQuery) + if err != nil { + writeJSONResponse(w, r.URL.RawQuery, http.StatusOK, response{}) + return + } + + writer := multipart.NewWriter(w) + writer.SetBoundary(pfsmod.DefaultMultiPartBoundary) + + fileName := chunk.ToURLParam().Encode() + part, err := writer.CreateFormFile("chunk", fileName) + if err != nil { + log.Error(err) + return + } + + if err = chunk.LoadChunkData(part); err != nil { + log.Error(err) + return + } + + err = writer.Close() + if err != nil { + log.Error(err) + return + } + + log.V(1).Info("end proc GetChunkHandler") + return +} + +// PostChunkHandler processes POST Chunk request. +func PostChunkHandler(w http.ResponseWriter, r *http.Request) { + log.V(1).Infof("begin proc PostChunksHandler\n") + + resp := response{} + partReader, err := r.MultipartReader() + if err != nil { + writeJSONResponse(w, "ChunkHandler", http.StatusBadRequest, resp) + return + } + + for { + part, err := partReader.NextPart() + if err == io.EOF { + break + } + + if part.FormName() != "chunk" { + continue + } + + cmd, err := pfsmod.ParseChunk(part.FileName()) + if err != nil { + resp.Err = err.Error() + writeJSONResponse(w, part.FileName(), http.StatusOK, resp) + return + } + + log.V(1).Infof("recv cmd:%#v\n", cmd) + + if err := cmd.SaveChunkData(part); err != nil { + resp.Err = err.Error() + writeJSONResponse(w, part.FileName(), http.StatusOK, resp) + return + } + + writeJSONResponse(w, part.FileName(), http.StatusOK, resp) + } + + log.V(1).Infof("end proc PostChunksHandler\n") +} diff --git a/go/filemanager/pfsserver/router.go b/go/filemanager/pfsserver/router.go new file mode 100644 index 00000000..2f4f6c86 --- /dev/null +++ b/go/filemanager/pfsserver/router.go @@ -0,0 +1,46 @@ +package pfsserver + +import ( + "net/http" + "time" + + log "github.com/golang/glog" + "github.com/gorilla/mux" +) + +func logger(inner http.Handler, name string) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + start := time.Now() + + inner.ServeHTTP(w, r) + + log.Infof( + "%s\t%s\t%s\t%s", + r.Method, + r.RequestURI, + name, + time.Since(start), + ) + }) +} + +// NewRouter returns a new Router struct. +func NewRouter() *mux.Router { + + router := mux.NewRouter().StrictSlash(true) + for _, route := range routes { + var handler http.Handler + + handler = route.HandlerFunc + handler = logger(handler, route.Name) + + router. + Methods(route.Method). + Path(route.Pattern). + Name(route.Name). + Handler(handler) + + } + + return router +} diff --git a/go/filemanager/pfsserver/routes.go b/go/filemanager/pfsserver/routes.go new file mode 100644 index 00000000..375265ae --- /dev/null +++ b/go/filemanager/pfsserver/routes.go @@ -0,0 +1,56 @@ +package pfsserver + +import ( + "net/http" +) + +// Route represents route struct. +type Route struct { + Name string + Method string + Pattern string + HandlerFunc http.HandlerFunc +} + +type Routes []Route + +var routes = Routes{ + Route{ + "GetFiles", + "GET", + "/api/v1/files", + GetFilesHandler, + }, + Route{ + "PostFiles", + "POST", + "/api/v1/files", + PostFilesHandler, + }, + Route{ + "DeleteFiles", + "DELETE", + "/api/v1/files", + DeleteFilesHandler, + }, + + Route{ + "GetChunksMeta", + "GET", + "/api/v1/chunks", + GetChunkMetaHandler, + }, + Route{ + "GetChunksData", + "GET", + "/api/v1/storage/chunks", + GetChunkHandler, + }, + + Route{ + "PostChunksData", + "POST", + "/api/v1/storage/chunks", + PostChunkHandler, + }, +} diff --git a/go/paddlecloud/download.go b/go/paddlecloud/download.go new file mode 100644 index 00000000..7ef388ce --- /dev/null +++ b/go/paddlecloud/download.go @@ -0,0 +1,184 @@ +package paddlecloud + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "mime/multipart" + "os" + "path/filepath" + + pfsmod "github.com/PaddlePaddle/cloud/go/filemanager/pfsmodules" + log "github.com/golang/glog" +) + +func remoteChunkMeta(path string, + chunkSize int64) ([]pfsmod.ChunkMeta, error) { + cmd := pfsmod.ChunkMetaCmd{ + Method: pfsmod.ChunkMetaCmdName, + FilePath: path, + ChunkSize: chunkSize, + } + + t := fmt.Sprintf("%s/api/v1/chunks", config.ActiveConfig.Endpoint) + ret, err := GetCall(t, cmd.ToURLParam()) + if err != nil { + return nil, err + } + + type chunkMetaResponse struct { + Err string `json:"err"` + Results []pfsmod.ChunkMeta `json:"results"` + } + + resp := chunkMetaResponse{} + if err := json.Unmarshal(ret, &resp); err != nil { + return nil, err + } + + if len(resp.Err) == 0 { + return resp.Results, nil + } + + return resp.Results, errors.New(resp.Err) +} + +func getChunkData(target string, chunk pfsmod.Chunk, dst string) error { + log.V(1).Info("target url: " + target) + + resp, err := GetChunk(target, chunk.ToURLParam()) + if err != nil { + return err + } + defer pfsmod.Close(resp.Body) + + if resp.Status != HTTPOK { + return errors.New("http server returned non-200 status: " + resp.Status) + } + + partReader := multipart.NewReader(resp.Body, pfsmod.DefaultMultiPartBoundary) + for { + part, error := partReader.NextPart() + if error == io.EOF { + break + } + + if part.FormName() == "chunk" { + recvCmd, err := pfsmod.ParseChunk(part.FileName()) + if err != nil { + return errors.New(err.Error()) + } + + recvCmd.Path = dst + + if err := recvCmd.SaveChunkData(part); err != nil { + return err + } + } + } + + return nil +} + +func downloadChunks(src string, + dst string, diffMeta []pfsmod.ChunkMeta) error { + if len(diffMeta) == 0 { + log.V(1).Infof("srcfile:%s and dstfile:%s are already same\n", src, dst) + fmt.Printf("download ok\n") + return nil + } + + t := fmt.Sprintf("%s/api/v1/storage/chunks", config.ActiveConfig.Endpoint) + for _, meta := range diffMeta { + chunk := pfsmod.Chunk{ + Path: src, + Offset: meta.Offset, + Size: meta.Len, + } + + err := getChunkData(t, chunk, dst) + if err != nil { + return err + } + } + + return nil +} + +func downloadFile(src string, srcFileSize int64, dst string) error { + srcMeta, err := remoteChunkMeta(src, defaultChunkSize) + if err != nil { + return err + } + log.V(4).Infof("srcMeta:%#v\n\n", srcMeta) + + dstMeta, err := pfsmod.GetChunkMeta(dst, defaultChunkSize) + if err != nil { + if os.IsNotExist(err) { + if err := pfsmod.CreateSizedFile(dst, srcFileSize); err != nil { + return err + } + } else { + return err + } + } + log.V(4).Infof("dstMeta:%#v\n", dstMeta) + + diffMeta, err := pfsmod.GetDiffChunkMeta(srcMeta, dstMeta) + if err != nil { + return err + } + + err = downloadChunks(src, dst, diffMeta) + return err +} + +func checkBeforeDownLoad(src []pfsmod.LsResult, dst string) (bool, error) { + var bDir bool + fi, err := os.Stat(dst) + if err == nil { + bDir = fi.IsDir() + if !fi.IsDir() && len(src) > 1 { + return bDir, errors.New(pfsmod.StatusDestShouldBeDirectory) + } + } else if os.IsNotExist(err) { + return false, nil + } + + return bDir, err +} + +func download(src, dst string) error { + log.V(1).Infof("download %s to %s\n", src, dst) + lsRet, err := RemoteLs(pfsmod.NewLsCmd(true, src)) + if err != nil { + return err + } + + bDir, err := checkBeforeDownLoad(lsRet, dst) + if err != nil { + return err + } + + for _, attr := range lsRet { + if attr.IsDir { + return errors.New(pfsmod.StatusOnlySupportFiles) + } + + realSrc := attr.Path + realDst := dst + + if bDir { + _, file := filepath.Split(attr.Path) + realDst = dst + "/" + file + } + + fmt.Printf("download src_path:%s dst_path:%s\n", realSrc, realDst) + if err := downloadFile(realSrc, attr.Size, realDst); err != nil { + return err + } + } + + return nil +} diff --git a/go/paddlecloud/pfscp.go b/go/paddlecloud/pfscp.go new file mode 100644 index 00000000..e0c592d6 --- /dev/null +++ b/go/paddlecloud/pfscp.go @@ -0,0 +1,97 @@ +package paddlecloud + +import ( + "context" + "errors" + "flag" + "fmt" + + pfsmod "github.com/PaddlePaddle/cloud/go/filemanager/pfsmodules" + "github.com/google/subcommands" +) + +const ( + defaultChunkSize = 2 * 1024 * 1024 +) + +// CpCommand represents a copy command. +type CpCommand struct { + cmd pfsmod.CpCmd +} + +// Name returns CpCommand's name. +func (*CpCommand) Name() string { return "cp" } + +// Synopsis returns synopsis of CpCommand. +func (*CpCommand) Synopsis() string { return "uoload or download files" } + +// Usage returns usage of CpCommand. +func (*CpCommand) Usage() string { + return `cp [-v] + upload or downlod files, does't support directories this version + Options: + ` +} + +// SetFlags sets CpCommand's parameter. +func (p *CpCommand) SetFlags(f *flag.FlagSet) { + f.BoolVar(&p.cmd.V, "v", false, "Cause cp to be verbose, showing files after they are copied.") +} + +// Execute runs CpCommand. +func (p *CpCommand) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus { + if f.NArg() < 2 { + f.Usage() + return subcommands.ExitFailure + } + + cmd, err := pfsmod.NewCpCmdFromFlag(f) + if err != nil { + return subcommands.ExitSuccess + } + + if err := RunCp(cmd); err != nil { + return subcommands.ExitFailure + } + + return subcommands.ExitSuccess +} + +// RunCp runs CpCommand. +func RunCp(cmd *pfsmod.CpCmd) error { + var results []pfsmod.CpCmdResult + + for _, arg := range cmd.Src { + fmt.Println(cmd.PartToString(arg, cmd.Dst)) + + var ret []pfsmod.CpCmdResult + var err error + + if pfsmod.IsCloudPath(arg) { + if pfsmod.IsCloudPath(cmd.Dst) { + err = errors.New(pfsmod.StatusOnlySupportFiles) + } else { + err = download(arg, cmd.Dst) + } + } else { + if pfsmod.IsCloudPath(cmd.Dst) { + err = upload(arg, cmd.Dst) + } else { + //can't do that + err = errors.New(pfsmod.StatusOnlySupportFiles) + } + } + + if err != nil { + fmt.Printf("%#v\n", err) + return err + } + + if ret != nil { + results = append(results, ret...) + } + fmt.Println("") + } + + return nil +} diff --git a/go/paddlecloud/pfsls.go b/go/paddlecloud/pfsls.go new file mode 100644 index 00000000..881180f2 --- /dev/null +++ b/go/paddlecloud/pfsls.go @@ -0,0 +1,131 @@ +package paddlecloud + +import ( + "context" + "encoding/json" + "errors" + "flag" + "fmt" + "time" + + pfsmod "github.com/PaddlePaddle/cloud/go/filemanager/pfsmodules" + log "github.com/golang/glog" + "github.com/google/subcommands" +) + +// LsCommand represents ls command. +type LsCommand struct { + cmd pfsmod.LsCmd +} + +// Name returns LsCommand's name. +func (*LsCommand) Name() string { return "ls" } + +// Synopsis returns Synopsis of LsCommand. +func (*LsCommand) Synopsis() string { return "List files on PaddlePaddle Cloud" } + +// Usage returns usage of LsCommand. +func (*LsCommand) Usage() string { + return `ls [-r] : + List files on PaddlePaddleCloud + Options: +` +} + +// SetFlags sets LsCommand's parameters. +func (p *LsCommand) SetFlags(f *flag.FlagSet) { + f.BoolVar(&p.cmd.R, "r", false, "list files recursively") +} + +// getFormatPrint gets max width of filesize and return format string to print. +func getFormatString(result []pfsmod.LsResult) string { + max := 0 + for _, t := range result { + str := fmt.Sprintf("%d", t.Size) + + if len(str) > max { + max = len(str) + } + } + + return fmt.Sprintf("%%s %%s %%%dd %%s\n", max) +} + +func formatPrint(result []pfsmod.LsResult) { + formatStr := getFormatString(result) + + for _, t := range result { + timeStr := time.Unix(0, t.ModTime).Format("2006-01-02 15:04:05") + + if t.IsDir { + fmt.Printf(formatStr, timeStr, "d", t.Size, t.Path) + } else { + fmt.Printf(formatStr, timeStr, "f", t.Size, t.Path) + } + } + + fmt.Printf("\n") +} + +// RemoteLs gets LsCmd result from cloud. +func RemoteLs(cmd *pfsmod.LsCmd) ([]pfsmod.LsResult, error) { + t := fmt.Sprintf("%s/api/v1/files", config.ActiveConfig.Endpoint) + body, err := GetCall(t, cmd.ToURLParam()) + if err != nil { + return nil, err + } + + type lsResponse struct { + Err string `json:"err"` + Results []pfsmod.LsResult `json:"results"` + } + + resp := lsResponse{} + if err := json.Unmarshal(body, &resp); err != nil { + return resp.Results, err + } + + if len(resp.Err) == 0 { + return resp.Results, nil + } + + return resp.Results, errors.New(resp.Err) +} + +func remoteLs(cmd *pfsmod.LsCmd) error { + for _, arg := range cmd.Args { + subcmd := pfsmod.NewLsCmd( + cmd.R, + arg, + ) + result, err := RemoteLs(subcmd) + + fmt.Printf("%s :\n", arg) + if err != nil { + fmt.Printf(" error:%s\n\n", err.Error()) + return err + } + + formatPrint(result) + } + return nil +} + +// Execute runs a LsCommand. +func (p *LsCommand) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus { + if f.NArg() < 1 { + f.Usage() + return subcommands.ExitFailure + } + + cmd, err := pfsmod.NewLsCmdFromFlag(f) + if err != nil { + return subcommands.ExitFailure + } + log.V(1).Infof("%#v\n", cmd) + + if err := remoteLs(cmd); err != nil { + return subcommands.ExitFailure + } + return subcommands.ExitSuccess +} diff --git a/go/paddlecloud/pfsmkdir.go b/go/paddlecloud/pfsmkdir.go new file mode 100644 index 00000000..4510b2b9 --- /dev/null +++ b/go/paddlecloud/pfsmkdir.go @@ -0,0 +1,109 @@ +package paddlecloud + +import ( + "context" + "encoding/json" + "errors" + "flag" + "fmt" + + pfsmod "github.com/PaddlePaddle/cloud/go/filemanager/pfsmodules" + log "github.com/golang/glog" + "github.com/google/subcommands" +) + +// MkdirCommand represents mkdir command. +type MkdirCommand struct { + //cmd pfsmod.MkdirCmd +} + +// Name returns name of MkdirComand. +func (*MkdirCommand) Name() string { return "mkdir" } + +// Synopsis returns synopsis of MkdirCommand. +func (*MkdirCommand) Synopsis() string { return "mkdir directoies on PaddlePaddle Cloud" } + +// Usage returns usage of MkdirCommand. +func (*MkdirCommand) Usage() string { + return `mkdir : + mkdir directories on PaddlePaddleCloud + Options: +` +} + +// SetFlags sets MkdirCommand's parameters. +func (p *MkdirCommand) SetFlags(f *flag.FlagSet) { +} + +func formatMkdirPrint(results []pfsmod.MkdirResult, err error) { + if err != nil { + fmt.Println("\t" + err.Error()) + } +} + +// RemoteMkdir creat a directory on cloud. +func RemoteMkdir(cmd *pfsmod.MkdirCmd) ([]pfsmod.MkdirResult, error) { + j, err := cmd.ToJSON() + if err != nil { + return nil, err + } + + t := fmt.Sprintf("%s/api/v1/files", config.ActiveConfig.Endpoint) + log.V(2).Infoln(t) + body, err := PostCall(t, j) + if err != nil { + return nil, err + } + + log.V(3).Info(string(body[:])) + + type mkdirResponse struct { + Err string `json:"err"` + Results []pfsmod.MkdirResult `json:"results"` + } + + resp := mkdirResponse{} + if err := json.Unmarshal(body, &resp); err != nil { + return resp.Results, err + } + + log.V(1).Infof("%#v\n", resp) + + if len(resp.Err) == 0 { + return resp.Results, nil + } + + return resp.Results, errors.New(resp.Err) +} + +func remoteMkdir(cmd *pfsmod.MkdirCmd) error { + for _, arg := range cmd.Args { + subcmd := pfsmod.NewMkdirCmd(arg) + + fmt.Printf("mkdir %s\n", arg) + results, err := RemoteMkdir(subcmd) + formatMkdirPrint(results, err) + } + return nil + +} + +// Execute runs a MkdirCommand. +func (p *MkdirCommand) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus { + if f.NArg() < 1 { + f.Usage() + return subcommands.ExitFailure + } + + cmd, err := pfsmod.NewMkdirCmdFromFlag(f) + if err != nil { + return subcommands.ExitFailure + } + log.V(1).Infof("%#v\n", cmd) + + if err := remoteMkdir(cmd); err != nil { + return subcommands.ExitFailure + } + + return subcommands.ExitSuccess +} diff --git a/go/paddlecloud/pfsrm.go b/go/paddlecloud/pfsrm.go new file mode 100644 index 00000000..e3f78e9f --- /dev/null +++ b/go/paddlecloud/pfsrm.go @@ -0,0 +1,118 @@ +package paddlecloud + +import ( + "context" + "encoding/json" + "errors" + "flag" + "fmt" + + pfsmod "github.com/PaddlePaddle/cloud/go/filemanager/pfsmodules" + log "github.com/golang/glog" + "github.com/google/subcommands" +) + +// RmCommand represents remove command. +type RmCommand struct { + cmd pfsmod.RmCmd +} + +// Name returns RmCommand's name. +func (*RmCommand) Name() string { return "rm" } + +// Synopsis returns synopsis of RmCommand. +func (*RmCommand) Synopsis() string { return "rm files on PaddlePaddle Cloud" } + +// Usage returns usage of RmCommand. +func (*RmCommand) Usage() string { + return `rm -r : + rm files on PaddlePaddleCloud + Options: +` +} + +// SetFlags sets RmCommand's parameters. +func (p *RmCommand) SetFlags(f *flag.FlagSet) { + f.BoolVar(&p.cmd.R, "r", false, "rm files recursively") +} + +func formatRmPrint(results []pfsmod.RmResult, err error) { + for _, result := range results { + fmt.Printf("rm %s\n", result.Path) + } + + if err != nil { + fmt.Println("\t" + err.Error()) + } + + return +} + +// RemoteRm gets RmCmd Result from cloud. +func RemoteRm(cmd *pfsmod.RmCmd) ([]pfsmod.RmResult, error) { + j, err := cmd.ToJSON() + if err != nil { + return nil, err + } + + t := fmt.Sprintf("%s/api/v1/files", config.ActiveConfig.Endpoint) + body, err := DeleteCall(t, j) + if err != nil { + return nil, err + } + + log.V(3).Info(string(body[:])) + + type rmResponse struct { + Err string `json:"err"` + Results []pfsmod.RmResult `json:"path"` + } + + resp := rmResponse{} + if err := json.Unmarshal(body, &resp); err != nil { + return resp.Results, err + } + + log.V(1).Infof("%#v\n", resp) + + if len(resp.Err) == 0 { + return resp.Results, nil + } + + return resp.Results, errors.New(resp.Err) +} + +func remoteRm(cmd *pfsmod.RmCmd) error { + for _, arg := range cmd.Args { + subcmd := pfsmod.NewRmCmd( + cmd.R, + arg, + ) + + fmt.Printf("rm %s\n", arg) + result, err := RemoteRm(subcmd) + formatRmPrint(result, err) + } + return nil + +} + +// Execute runs a RmCommand. +func (p *RmCommand) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus { + if f.NArg() < 1 { + f.Usage() + return subcommands.ExitFailure + } + + cmd, err := pfsmod.NewRmCmdFromFlag(f) + if err != nil { + return subcommands.ExitFailure + } + log.V(1).Infof("%#v\n", cmd) + + if err := remoteRm(cmd); err != nil { + return subcommands.ExitFailure + } + + return subcommands.ExitSuccess +} diff --git a/go/paddlecloud/restclient.go b/go/paddlecloud/restclient.go index d09382fc..813ca996 100644 --- a/go/paddlecloud/restclient.go +++ b/go/paddlecloud/restclient.go @@ -10,6 +10,8 @@ import ( "net/http" "net/url" "os" + + log "github.com/golang/glog" ) // HTTPOK is ok status of http api call. @@ -20,10 +22,20 @@ var httpClient = &http.Client{Transport: &http.Transport{}} func makeRequest(uri string, method string, body io.Reader, contentType string, query url.Values, authHeader map[string]string) (*http.Request, error) { + + if query != nil { + uri = fmt.Sprintf("%s?%s", uri, query.Encode()) + log.V(4).Infoln(uri) + } + + log.V(4).Infof("%s %s %T\n", method, uri, body) + req, err := http.NewRequest(method, uri, body) if err != nil { + log.Errorf("new request %v\n", err) return nil, err } + // default contentType is application/json. if len(contentType) == 0 { req.Header.Set("Content-Type", "application/json") @@ -35,9 +47,6 @@ func makeRequest(uri string, method string, body io.Reader, req.Header.Set(k, v) } - if query != nil { - req.URL.RawQuery = query.Encode() - } return req, nil } @@ -59,6 +68,7 @@ func makeRequestToken(uri string, method string, body io.Reader, func getResponse(req *http.Request) ([]byte, error) { resp, err := httpClient.Do(req) if err != nil { + log.Errorf("httpClient do error %v\n", err) return []byte{}, err } defer resp.Body.Close() @@ -131,7 +141,7 @@ func PostFile(targetURL string, filename string) ([]byte, error) { return getResponse(req) } -// PostChunkData makes a POST call to HTTP server to upload chunkdata. +// PostChunk makes a POST call to HTTP server to upload chunkdata. func PostChunk(targetURL string, chunkName string, reader io.Reader, len int64, boundary string) ([]byte, error) { body := &bytes.Buffer{} @@ -140,6 +150,7 @@ func PostChunk(targetURL string, return nil, err } + log.V(4).Infoln(chunkName) part, err := writer.CreateFormFile("chunk", chunkName) if err != nil { return nil, err @@ -150,18 +161,24 @@ func PostChunk(targetURL string, return nil, err } + err = writer.Close() + if err != nil { + return nil, err + } + contentType := writer.FormDataContentType() - writer.Close() + log.V(4).Infoln("before makeRequestToken") req, err := makeRequestToken(targetURL, "POST", body, contentType, nil) if err != nil { - return []byte{}, err + return nil, err } + log.V(4).Infoln("before getResponse") return getResponse(req) } -// GetChunkData makes a GET call to HTTP server to download chunk data. +// GetChunk makes a GET call to HTTP server to download chunk data. func GetChunk(targetURL string, query url.Values) (*http.Response, error) { req, err := makeRequestToken(targetURL, "GET", nil, "", query) diff --git a/go/paddlecloud/upload.go b/go/paddlecloud/upload.go new file mode 100644 index 00000000..75756bc1 --- /dev/null +++ b/go/paddlecloud/upload.go @@ -0,0 +1,227 @@ +package paddlecloud + +import ( + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "strings" + + pfsmod "github.com/PaddlePaddle/cloud/go/filemanager/pfsmodules" + log "github.com/golang/glog" +) + +func remoteStat(cmd *pfsmod.StatCmd) (*pfsmod.LsResult, error) { + t := fmt.Sprintf("%s/api/v1/files", config.ActiveConfig.Endpoint) + log.V(3).Infoln(t) + body, err := GetCall(t, cmd.ToURLParam()) + if err != nil { + return nil, err + } + + type statResponse struct { + Err string `json:"err"` + Results pfsmod.LsResult `json:"results"` + } + + resp := statResponse{} + if err := json.Unmarshal(body, &resp); err != nil { + return nil, err + } + + log.V(1).Infof("result:%#v\n", resp) + + if len(resp.Err) != 0 { + return nil, errors.New(resp.Err) + } + + return &resp.Results, nil +} + +func remoteTouch(cmd *pfsmod.TouchCmd) error { + j, err := cmd.ToJSON() + if err != nil { + return err + } + + t := fmt.Sprintf("%s/api/v1/files", config.ActiveConfig.Endpoint) + body, err := PostCall(t, j) + if err != nil { + return err + } + + type touchResponse struct { + Err string `json:"err"` + Results pfsmod.TouchResult `json:"results"` + } + + resp := touchResponse{} + if err := json.Unmarshal(body, &resp); err != nil { + return err + } + + if len(resp.Err) == 0 { + return nil + } + + return errors.New(resp.Err) +} + +type uploadChunkResponse struct { + Err string `json:"err"` +} + +func getChunkReader(path string, offset int64) (*os.File, error) { + f, err := os.Open(path) + if err != nil { + return nil, err + } + + _, err = f.Seek(offset, 0) + if err != nil { + pfsmod.Close(f) + return nil, err + } + + return f, nil +} + +func getDstParam(src *pfsmod.Chunk, dst string) string { + cmd := pfsmod.Chunk{ + Path: dst, + Offset: src.Offset, + Size: src.Size, + } + + return cmd.ToURLParam().Encode() +} + +func postChunk(src *pfsmod.Chunk, dst string) ([]byte, error) { + f, err := getChunkReader(src.Path, src.Offset) + if err != nil { + return nil, err + } + defer pfsmod.Close(f) + + t := fmt.Sprintf("%s/api/v1/storage/chunks", config.ActiveConfig.Endpoint) + log.V(4).Infoln(t) + + return PostChunk(t, getDstParam(src, dst), + f, src.Size, pfsmod.DefaultMultiPartBoundary) +} + +func uploadChunks(src string, + dst string, + diffMeta []pfsmod.ChunkMeta) error { + if len(diffMeta) == 0 { + log.V(1).Infof("srcfile:%s and destfile:%s are same\n", src, dst) + return nil + } + + for _, meta := range diffMeta { + log.V(3).Infof("diffMeta:%v\n", meta) + + chunk := pfsmod.Chunk{ + Path: src, + Offset: meta.Offset, + Size: meta.Len, + } + + body, err := postChunk(&chunk, dst) + if err != nil { + return err + } + + resp := uploadChunkResponse{} + if err := json.Unmarshal(body, &resp); err != nil { + return err + } + + if len(resp.Err) == 0 { + continue + } + + return errors.New(resp.Err) + } + + return nil +} + +func uploadFile(src, dst string, srcFileSize int64) error { + + log.V(1).Infof("touch %s size:%d\n", dst, srcFileSize) + + cmd := pfsmod.TouchCmd{ + Method: pfsmod.TouchCmdName, + Path: dst, + FileSize: srcFileSize, + } + + if err := remoteTouch(&cmd); err != nil { + return err + } + + dstMeta, err := remoteChunkMeta(dst, defaultChunkSize) + if err != nil { + return err + } + log.V(2).Infof("dst %s chunkMeta:%#v\n", dst, dstMeta) + + srcMeta, err := pfsmod.GetChunkMeta(src, defaultChunkSize) + if err != nil { + return err + } + log.V(2).Infof("src %s chunkMeta:%#v\n", src, srcMeta) + + diffMeta, err := pfsmod.GetDiffChunkMeta(srcMeta, dstMeta) + if err != nil { + return err + } + log.V(2).Infof("diff chunkMeta:%#v\n", diffMeta) + + return uploadChunks(src, dst, diffMeta) +} + +func upload(src, dst string) error { + lsCmd := pfsmod.NewLsCmd(true, src) + srcRet, err := lsCmd.Run() + if err != nil { + return err + } + log.V(1).Infof("ls src:%s result:%#v\n", src, srcRet) + + dstMeta, err := remoteStat(&pfsmod.StatCmd{Path: dst, Method: pfsmod.StatCmdName}) + if err != nil && !strings.Contains(err.Error(), pfsmod.StatusFileNotFound) { + return err + } + log.V(1).Infof("stat dst:%s result:%#v\n", dst, dstMeta) + + srcMetas := srcRet.([]pfsmod.LsResult) + + for _, srcMeta := range srcMetas { + if srcMeta.IsDir { + return errors.New(pfsmod.StatusOnlySupportFiles) + } + + realSrc := srcMeta.Path + realDst := dst + + _, file := filepath.Split(srcMeta.Path) + if dstMeta != nil && dstMeta.IsDir { + realDst = dst + "/" + file + } + + log.V(1).Infof("upload src_path:%s src_file_size:%d dst_path:%s\n", + realSrc, srcMeta.Size, realDst) + fmt.Printf("uploading %s to %s", realSrc, realDst) + if err := uploadFile(realSrc, realDst, srcMeta.Size); err != nil { + fmt.Printf(" error %v\n", err) + return err + } + + fmt.Printf(" ok!\n") + } + + return nil +}