forked from grafana/loki
-
Notifications
You must be signed in to change notification settings - Fork 0
/
object_client.go
205 lines (174 loc) · 6.39 KB
/
object_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
package client
import (
"bytes"
"context"
"encoding/base64"
"io"
"strings"
"time"
"github.com/pkg/errors"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/config"
)
// ObjectClient is used to store arbitrary data in Object Store (S3/GCS/Azure/...)
type ObjectClient interface {
PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error
// NOTE: The consumer of GetObject should always call the Close method when it is done reading which otherwise could cause a resource leak.
GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error)
// List objects with given prefix.
//
// If delimiter is empty, all objects are returned, even if they are in nested in "subdirectories".
// If delimiter is not empty, it is used to compute common prefixes ("subdirectories"),
// and objects containing delimiter in the name will not be returned in the result.
//
// For example, if the prefix is "notes/" and the delimiter is a slash (/) as in "notes/summer/july", the common prefix is "notes/summer/".
// Common prefixes will always end with passed delimiter.
//
// Keys of returned storage objects have given prefix.
List(ctx context.Context, prefix string, delimiter string) ([]StorageObject, []StorageCommonPrefix, error)
DeleteObject(ctx context.Context, objectKey string) error
IsObjectNotFoundErr(err error) bool
Stop()
}
// StorageObject represents an object being stored in an Object Store
type StorageObject struct {
Key string
ModifiedAt time.Time
}
// StorageCommonPrefix represents a common prefix aka a synthetic directory in Object Store.
// It is guaranteed to always end with delimiter passed to List method.
type StorageCommonPrefix string
// KeyEncoder is used to encode chunk keys before writing/retrieving chunks
// from the underlying ObjectClient
// Schema/Chunk are passed as arguments to allow this to improve over revisions
type KeyEncoder func(schema config.SchemaConfig, chk chunk.Chunk) string
// base64Encoder is used to encode chunk keys in base64 before storing/retrieving
// them from the ObjectClient
var base64Encoder = func(key string) string {
return base64.StdEncoding.EncodeToString([]byte(key))
}
var FSEncoder = func(schema config.SchemaConfig, chk chunk.Chunk) string {
// Filesystem encoder pre-v12 encodes the chunk as one base64 string.
// This has the downside of making them opaque and storing all chunks in a single
// directory, hurting performance at scale and discoverability.
// Post v12, we respect the directory structure imposed by chunk keys.
key := schema.ExternalKey(chk.ChunkRef)
if schema.VersionForChunk(chk.ChunkRef) > 11 {
split := strings.LastIndexByte(key, '/')
encodedTail := base64Encoder(key[split+1:])
return strings.Join([]string{key[:split], encodedTail}, "/")
}
return base64Encoder(key)
}
const defaultMaxParallel = 150
// client is used to store chunks in object store backends
type client struct {
store ObjectClient
keyEncoder KeyEncoder
getChunkMaxParallel int
schema config.SchemaConfig
}
// NewClient wraps the provided ObjectClient with a chunk.Client implementation
func NewClient(store ObjectClient, encoder KeyEncoder, schema config.SchemaConfig) Client {
return NewClientWithMaxParallel(store, encoder, defaultMaxParallel, schema)
}
func NewClientWithMaxParallel(store ObjectClient, encoder KeyEncoder, maxParallel int, schema config.SchemaConfig) Client {
return &client{
store: store,
keyEncoder: encoder,
getChunkMaxParallel: maxParallel,
schema: schema,
}
}
// Stop shuts down the object store and any underlying clients
func (o *client) Stop() {
o.store.Stop()
}
// PutChunks stores the provided chunks in the configured backend. If multiple errors are
// returned, the last one sequentially will be propagated up.
func (o *client) PutChunks(ctx context.Context, chunks []chunk.Chunk) error {
var (
chunkKeys []string
chunkBufs [][]byte
)
for i := range chunks {
buf, err := chunks[i].Encoded()
if err != nil {
return err
}
var key string
if o.keyEncoder != nil {
key = o.keyEncoder(o.schema, chunks[i])
} else {
key = o.schema.ExternalKey(chunks[i].ChunkRef)
}
chunkKeys = append(chunkKeys, key)
chunkBufs = append(chunkBufs, buf)
}
incomingErrors := make(chan error)
for i := range chunkBufs {
go func(i int) {
incomingErrors <- o.store.PutObject(ctx, chunkKeys[i], bytes.NewReader(chunkBufs[i]))
}(i)
}
var lastErr error
for range chunkKeys {
err := <-incomingErrors
if err != nil {
lastErr = err
}
}
return lastErr
}
// GetChunks retrieves the specified chunks from the configured backend
func (o *client) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) {
getChunkMaxParallel := o.getChunkMaxParallel
if getChunkMaxParallel == 0 {
getChunkMaxParallel = defaultMaxParallel
}
return util.GetParallelChunks(ctx, getChunkMaxParallel, chunks, o.getChunk)
}
func (o *client) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, c chunk.Chunk) (chunk.Chunk, error) {
if ctx.Err() != nil {
return chunk.Chunk{}, ctx.Err()
}
key := o.schema.ExternalKey(c.ChunkRef)
if o.keyEncoder != nil {
key = o.keyEncoder(o.schema, c)
}
readCloser, size, err := o.store.GetObject(ctx, key)
if err != nil {
return chunk.Chunk{}, errors.WithStack(err)
}
if readCloser == nil {
return chunk.Chunk{}, errors.New("object client getChunk fail because object is nil")
}
defer readCloser.Close()
// adds bytes.MinRead to avoid allocations when the size is known.
// This is because ReadFrom reads bytes.MinRead by bytes.MinRead.
buf := bytes.NewBuffer(make([]byte, 0, size+bytes.MinRead))
_, err = buf.ReadFrom(readCloser)
if err != nil {
return chunk.Chunk{}, errors.WithStack(err)
}
if err := c.Decode(decodeContext, buf.Bytes()); err != nil {
return chunk.Chunk{}, errors.WithStack(err)
}
return c, nil
}
// GetChunks retrieves the specified chunks from the configured backend
func (o *client) DeleteChunk(ctx context.Context, userID, chunkID string) error {
key := chunkID
if o.keyEncoder != nil {
c, err := chunk.ParseExternalKey(userID, key)
if err != nil {
return err
}
key = o.keyEncoder(o.schema, c)
}
return o.store.DeleteObject(ctx, key)
}
func (o *client) IsChunkNotFoundErr(err error) bool {
return o.store.IsObjectNotFoundErr(err)
}