forked from pachyderm/pachyderm
/
local_client.go
116 lines (98 loc) · 2.46 KB
/
local_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package obj
import (
"io"
"os"
"path/filepath"
"strings"
)
// NewLocalClient returns a Client that stores data on the local file system
func NewLocalClient(root string) (Client, error) {
if err := os.MkdirAll(root, 0755); err != nil {
return nil, err
}
return &localClient{root}, nil
}
type localClient struct {
root string
}
func (c *localClient) Writer(path string) (io.WriteCloser, error) {
fullPath := filepath.Join(c.root, path)
// Create the directory since it may not exist
if err := os.MkdirAll(filepath.Dir(fullPath), 0755); err != nil {
return nil, err
}
file, err := os.Create(fullPath)
if err != nil {
return nil, err
}
return file, nil
}
func (c *localClient) Reader(path string, offset uint64, size uint64) (io.ReadCloser, error) {
file, err := os.Open(filepath.Join(c.root, path))
if err != nil {
return nil, err
}
if _, err := file.Seek(int64(offset), 0); err != nil {
return nil, err
}
if size == 0 {
if _, err := file.Seek(int64(offset), 0); err != nil {
return nil, err
}
return file, nil
}
return newSectionReadCloser(file, offset, size), nil
}
func (c *localClient) Delete(path string) error {
return os.Remove(filepath.Join(c.root, path))
}
func (c *localClient) Walk(dir string, walkFn func(name string) error) error {
dir = filepath.Join(c.root, dir)
fi, _ := os.Stat(dir)
prefix := ""
if fi == nil || !fi.IsDir() {
dir, prefix = filepath.Split(dir)
}
return filepath.Walk(dir, func(path string, fileInfo os.FileInfo, err error) error {
if err != nil {
if c.IsNotExist(err) {
return nil
}
return err
}
if fileInfo.IsDir() {
return nil
}
relPath, _ := filepath.Rel(c.root, path)
if !strings.HasPrefix(filepath.Base(relPath), prefix) {
return nil
}
return walkFn(relPath)
})
}
func (c *localClient) Exists(path string) bool {
_, err := os.Stat(filepath.Join(c.root, path))
return err == nil
}
func (c *localClient) IsRetryable(err error) bool {
return false
}
func (c *localClient) IsNotExist(err error) bool {
return strings.Contains(err.Error(), "no such file or directory")
}
func (c *localClient) IsIgnorable(err error) bool {
return false
}
type sectionReadCloser struct {
*io.SectionReader
f *os.File
}
func newSectionReadCloser(f *os.File, offset uint64, size uint64) *sectionReadCloser {
return §ionReadCloser{
SectionReader: io.NewSectionReader(f, int64(offset), int64(size)),
f: f,
}
}
func (s *sectionReadCloser) Close() error {
return s.f.Close()
}