forked from pachyderm/pachyderm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
minio_client.go
152 lines (130 loc) · 3.52 KB
/
minio_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package obj
import (
"io"
minio "github.com/minio/minio-go"
)
// Represents minio client instance for any s3 compatible server.
type minioClient struct {
*minio.Client
bucket string
}
// Creates a new minioClient structure and returns
func newMinioClient(endpoint, bucket, id, secret string, secure bool) (*minioClient, error) {
mclient, err := minio.New(endpoint, id, secret, secure)
if err != nil {
return nil, err
}
return &minioClient{
bucket: bucket,
Client: mclient,
}, nil
}
// Creates a new minioClient S3V2 structure and returns
func newMinioClientV2(endpoint, bucket, id, secret string, secure bool) (*minioClient, error) {
mclient, err := minio.NewV2(endpoint, id, secret, secure)
if err != nil {
return nil, err
}
return &minioClient{
bucket: bucket,
Client: mclient,
}, nil
}
// Represents minio writer structure with pipe and the error channel
type minioWriter struct {
errChan chan error
pipe *io.PipeWriter
}
// Creates a new minio writer and a go routine to upload objects to minio server
func newMinioWriter(client *minioClient, name string) *minioWriter {
reader, writer := io.Pipe()
w := &minioWriter{
errChan: make(chan error),
pipe: writer,
}
go func() {
_, err := client.PutObject(client.bucket, name, reader, "application/octet-stream")
if err != nil {
reader.CloseWithError(err)
}
w.errChan <- err
}()
return w
}
func (w *minioWriter) Write(p []byte) (int, error) {
return w.pipe.Write(p)
}
// This will block till upload is done
func (w *minioWriter) Close() error {
if err := w.pipe.Close(); err != nil {
return err
}
return <-w.errChan
}
func (c *minioClient) Writer(name string) (io.WriteCloser, error) {
return newMinioWriter(c, name), nil
}
func (c *minioClient) Walk(name string, fn func(name string) error) error {
recursive := true // Recursively walk by default.
doneCh := make(chan struct{})
defer close(doneCh)
for objInfo := range c.ListObjectsV2(c.bucket, name, recursive, doneCh) {
if objInfo.Err != nil {
return objInfo.Err
}
if err := fn(objInfo.Key); err != nil {
return err
}
}
return nil
}
// limitReadCloser implements a closer compatible wrapper
// for a size limited reader.
type limitReadCloser struct {
io.Reader
mObj *minio.Object
}
func (l *limitReadCloser) Close() (err error) {
return l.mObj.Close()
}
func (c *minioClient) Reader(name string, offset uint64, size uint64) (io.ReadCloser, error) {
obj, err := c.GetObject(c.bucket, name)
if err != nil {
return nil, err
}
// Seek to an offset to fetch the new reader.
_, err = obj.Seek(int64(offset), 0)
if err != nil {
return nil, err
}
if size > 0 {
return &limitReadCloser{io.LimitReader(obj, int64(size)), obj}, nil
}
return obj, nil
}
func (c *minioClient) Delete(name string) error {
return c.RemoveObject(c.bucket, name)
}
func (c *minioClient) Exists(name string) bool {
_, err := c.StatObject(c.bucket, name)
return err == nil
}
func (c *minioClient) IsRetryable(err error) bool {
// Minio client already implements retrying, no
// need for a caller retry.
return false
}
func (c *minioClient) IsIgnorable(err error) bool {
return false
}
// Sentinel error response returned if err is not
// of type *minio.ErrorResponse.
var sentinelErrResp = minio.ErrorResponse{}
func (c *minioClient) IsNotExist(err error) bool {
errResp := minio.ToErrorResponse(err)
if errResp.Code == sentinelErrResp.Code {
return false
}
// Treat both object not found and bucket not found as IsNotExist().
return errResp.Code == "NoSuchKey" || errResp.Code == "NoSuchBucket"
}