forked from thanos-io/thanos
-
Notifications
You must be signed in to change notification settings - Fork 0
/
gcs.go
187 lines (164 loc) · 5.05 KB
/
gcs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
// Package gcs implements common object storage abstractions against Google Cloud Storage.
package gcs
import (
"context"
"fmt"
"io"
"math/rand"
"runtime"
"strings"
"testing"
"time"
"cloud.google.com/go/storage"
"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/pkg/errors"
"github.com/prometheus/common/version"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
yaml "gopkg.in/yaml.v2"
)
// DirDelim is the delimiter used to model a directory structure in an object store bucket.
const DirDelim = "/"
// Config stores the configuration for gcs bucket.
type Config struct {
Bucket string `yaml:"bucket"`
}
// Bucket implements the store.Bucket and shipper.Bucket interfaces against GCS.
type Bucket struct {
logger log.Logger
bkt *storage.BucketHandle
name string
closer io.Closer
}
// NewBucket returns a new Bucket against the given bucket handle.
func NewBucket(ctx context.Context, logger log.Logger, conf []byte, component string) (*Bucket, error) {
var gc Config
if err := yaml.Unmarshal(conf, &gc); err != nil {
return nil, err
}
if gc.Bucket == "" {
return nil, errors.New("missing Google Cloud Storage bucket name for stored blocks")
}
gcsOptions := option.WithUserAgent(fmt.Sprintf("thanos-%s/%s (%s)", component, version.Version, runtime.Version()))
gcsClient, err := storage.NewClient(ctx, gcsOptions)
if err != nil {
return nil, err
}
bkt := &Bucket{
logger: logger,
bkt: gcsClient.Bucket(gc.Bucket),
closer: gcsClient,
name: gc.Bucket,
}
return bkt, nil
}
// Name returns the bucket name for gcs.
func (b *Bucket) Name() string {
return b.name
}
// Iter calls f for each entry in the given directory. The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) error {
// Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
// object itself as one prefix item.
if dir != "" {
dir = strings.TrimSuffix(dir, DirDelim) + DirDelim
}
it := b.bkt.Objects(ctx, &storage.Query{
Prefix: dir,
Delimiter: DirDelim,
})
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
attrs, err := it.Next()
if err == iterator.Done {
return nil
}
if err != nil {
return err
}
if err := f(attrs.Prefix + attrs.Name); err != nil {
return err
}
}
}
// Get returns a reader for the given object name.
func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return b.bkt.Object(name).NewReader(ctx)
}
// GetRange returns a new range reader for the given object name and range.
func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
return b.bkt.Object(name).NewRangeReader(ctx, off, length)
}
// Handle returns the underlying GCS bucket handle.
// Used for testing purposes (we return handle, so it is not instrumented).
func (b *Bucket) Handle() *storage.BucketHandle {
return b.bkt
}
// Exists checks if the given object exists.
func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
if _, err := b.bkt.Object(name).Attrs(ctx); err == nil {
return true, nil
} else if err != storage.ErrObjectNotExist {
return false, err
}
return false, nil
}
// Upload writes the file specified in src to remote GCS location specified as target.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
w := b.bkt.Object(name).NewWriter(ctx)
if _, err := io.Copy(w, r); err != nil {
return err
}
return w.Close()
}
// Delete removes the object with the given name.
func (b *Bucket) Delete(ctx context.Context, name string) error {
return b.bkt.Object(name).Delete(ctx)
}
// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
func (b *Bucket) IsObjNotFoundErr(err error) bool {
return err == storage.ErrObjectNotExist
}
func (b *Bucket) Close() error {
return b.closer.Close()
}
// NewTestBucket creates test bkt client that before returning creates temporary bucket.
// In a close function it empties and deletes the bucket.
func NewTestBucket(t testing.TB, project string) (objstore.Bucket, func(), error) {
ctx, cancel := context.WithCancel(context.Background())
src := rand.NewSource(time.Now().UnixNano())
gTestConfig := Config{
Bucket: fmt.Sprintf("test_%s_%x", strings.ToLower(t.Name()), src.Int63()),
}
bc, err := yaml.Marshal(gTestConfig)
if err != nil {
return nil, nil, err
}
b, err := NewBucket(ctx, log.NewNopLogger(), bc, "thanos-e2e-test")
if err != nil {
cancel()
return nil, nil, err
}
if err = b.bkt.Create(ctx, project, nil); err != nil {
cancel()
_ = b.Close()
return nil, nil, err
}
t.Log("created temporary GCS bucket for GCS tests with name", b.name, "in project", project)
return b, func() {
objstore.EmptyBucket(t, ctx, b)
if err := b.bkt.Delete(ctx); err != nil {
t.Logf("deleting bucket failed: %s", err)
}
cancel()
if err := b.Close(); err != nil {
t.Logf("closing bucket failed: %s", err)
}
}, nil
}