forked from grafana/loki
-
Notifications
You must be signed in to change notification settings - Fork 1
/
oss_object_client.go
164 lines (144 loc) · 5.1 KB
/
oss_object_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package alibaba
import (
"context"
"flag"
"io"
"net/http"
"strconv"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/instrument"
"github.com/grafana/loki/pkg/storage/chunk/client"
)
const NoSuchKeyErr = "NoSuchKey"
var ossRequestDuration = instrument.NewHistogramCollector(prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "loki",
Name: "oss_request_duration_seconds",
Help: "Time spent doing OSS requests.",
Buckets: prometheus.ExponentialBuckets(0.005, 4, 7),
}, []string{"operation", "status_code"}))
func init() {
ossRequestDuration.Register()
}
type OssObjectClient struct {
defaultBucket *oss.Bucket
}
// OssConfig is config for the OSS Chunk Client.
type OssConfig struct {
Bucket string `yaml:"bucket"`
Endpoint string `yaml:"endpoint"`
AccessKeyID string `yaml:"access_key_id"`
SecretAccessKey string `yaml:"secret_access_key"`
}
// RegisterFlags registers flags.
func (cfg *OssConfig) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", f)
}
// RegisterFlagsWithPrefix registers flags with prefix.
func (cfg *OssConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.Bucket, prefix+"oss.bucketname", "", "Name of OSS bucket.")
f.StringVar(&cfg.Endpoint, prefix+"oss.endpoint", "", "oss Endpoint to connect to.")
f.StringVar(&cfg.AccessKeyID, prefix+"oss.access-key-id", "", "alibabacloud Access Key ID")
f.StringVar(&cfg.SecretAccessKey, prefix+"oss.secret-access-key", "", "alibabacloud Secret Access Key")
}
// NewOssObjectClient makes a new chunk.Client that writes chunks to OSS.
func NewOssObjectClient(_ context.Context, cfg OssConfig) (client.ObjectClient, error) {
client, err := oss.New(cfg.Endpoint, cfg.AccessKeyID, cfg.SecretAccessKey)
if err != nil {
return nil, err
}
bucket, err := client.Bucket(cfg.Bucket)
if err != nil {
return nil, err
}
return &OssObjectClient{
defaultBucket: bucket,
}, nil
}
func (s *OssObjectClient) Stop() {
}
// GetObject returns a reader and the size for the specified object key from the configured OSS bucket.
func (s *OssObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
var resp *oss.GetObjectResult
var options []oss.Option
err := instrument.CollectedRequest(ctx, "OSS.GetObject", ossRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
var requestErr error
resp, requestErr = s.defaultBucket.DoGetObject(&oss.GetObjectRequest{ObjectKey: objectKey}, options)
if requestErr != nil {
return requestErr
}
return nil
})
if err != nil {
return nil, 0, err
}
length := resp.Response.Headers.Get("Content-Length")
size, err := strconv.Atoi(length)
if err != nil {
return nil, 0, err
}
return resp.Response.Body, int64(size), err
}
// PutObject puts the specified bytes into the configured OSS bucket at the provided key
func (s *OssObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error {
return instrument.CollectedRequest(ctx, "OSS.PutObject", ossRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
if err := s.defaultBucket.PutObject(objectKey, object); err != nil {
return errors.Wrap(err, "failed to put oss object")
}
return nil
})
}
// List implements chunk.ObjectClient.
func (s *OssObjectClient) List(ctx context.Context, prefix, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
var storageObjects []client.StorageObject
var commonPrefixes []client.StorageCommonPrefix
marker := oss.Marker("")
for {
if ctx.Err() != nil {
return nil, nil, ctx.Err()
}
objects, err := s.defaultBucket.ListObjects(oss.Prefix(prefix), oss.Delimiter(delimiter), marker)
if err != nil {
return nil, nil, errors.Wrap(err, "list alibaba oss bucket failed")
}
marker = oss.Marker(objects.NextMarker)
for _, object := range objects.Objects {
storageObjects = append(storageObjects, client.StorageObject{
Key: object.Key,
ModifiedAt: object.LastModified,
})
}
for _, object := range objects.CommonPrefixes {
if object != "" {
commonPrefixes = append(commonPrefixes, client.StorageCommonPrefix(object))
}
}
if !objects.IsTruncated {
break
}
}
return storageObjects, commonPrefixes, nil
}
// DeleteObject deletes the specified object key from the configured OSS bucket.
func (s *OssObjectClient) DeleteObject(ctx context.Context, objectKey string) error {
return instrument.CollectedRequest(ctx, "OSS.DeleteObject", ossRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
err := s.defaultBucket.DeleteObject(objectKey)
if err != nil {
return err
}
return nil
})
}
// IsObjectNotFoundErr returns true if error means that object is not found. Relevant to GetObject and DeleteObject operations.
func (s *OssObjectClient) IsObjectNotFoundErr(err error) bool {
switch caseErr := err.(type) {
case oss.ServiceError:
if caseErr.Code == NoSuchKeyErr && caseErr.StatusCode == http.StatusNotFound {
return true
}
return false
default:
return false
}
}