forked from grafana/loki
/
schema_config.go
448 lines (391 loc) · 14.4 KB
/
schema_config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
package chunk
import (
"errors"
"flag"
"fmt"
"os"
"strconv"
"time"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"github.com/weaveworks/common/mtime"
yaml "gopkg.in/yaml.v2"
"github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/math"
)
const (
secondsInHour = int64(time.Hour / time.Second)
secondsInDay = int64(24 * time.Hour / time.Second)
millisecondsInHour = int64(time.Hour / time.Millisecond)
millisecondsInDay = int64(24 * time.Hour / time.Millisecond)
)
var (
errInvalidSchemaVersion = errors.New("invalid schema version")
errInvalidTablePeriod = errors.New("the table period must be a multiple of 24h (1h for schema v1)")
errConfigFileNotSet = errors.New("schema config file needs to be set")
errConfigChunkPrefixNotSet = errors.New("schema config for chunks is missing the 'prefix' setting")
errSchemaIncreasingFromTime = errors.New("from time in schemas must be distinct and in increasing order")
)
// PeriodConfig defines the schema and tables to use for a period of time
type PeriodConfig struct {
From DayTime `yaml:"from"` // used when working with config
IndexType string `yaml:"store"` // type of index client to use.
ObjectType string `yaml:"object_store"` // type of object client to use; if omitted, defaults to store.
Schema string `yaml:"schema"`
IndexTables PeriodicTableConfig `yaml:"index"`
ChunkTables PeriodicTableConfig `yaml:"chunks"`
RowShards uint32 `yaml:"row_shards"`
}
// DayTime is a model.Time what holds day-aligned values, and marshals to/from
// YAML in YYYY-MM-DD format.
type DayTime struct {
model.Time
}
// MarshalYAML implements yaml.Marshaller.
func (d DayTime) MarshalYAML() (interface{}, error) {
return d.String(), nil
}
// UnmarshalYAML implements yaml.Unmarshaller.
func (d *DayTime) UnmarshalYAML(unmarshal func(interface{}) error) error {
var from string
if err := unmarshal(&from); err != nil {
return err
}
t, err := time.Parse("2006-01-02", from)
if err != nil {
return err
}
d.Time = model.TimeFromUnix(t.Unix())
return nil
}
func (d *DayTime) String() string {
return d.Time.Time().Format("2006-01-02")
}
// SchemaConfig contains the config for our chunk index schemas
type SchemaConfig struct {
Configs []PeriodConfig `yaml:"configs"`
fileName string
}
// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *SchemaConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.fileName, "schema-config-file", "", "The path to the schema config file. The schema config is used only when running Cortex with the chunks storage.")
}
// loadFromFile loads the schema config from a yaml file
func (cfg *SchemaConfig) loadFromFile() error {
if cfg.fileName == "" {
return errConfigFileNotSet
}
f, err := os.Open(cfg.fileName)
if err != nil {
return err
}
decoder := yaml.NewDecoder(f)
decoder.SetStrict(true)
return decoder.Decode(&cfg)
}
// Validate the schema config and returns an error if the validation
// doesn't pass
func (cfg *SchemaConfig) Validate() error {
for i := range cfg.Configs {
periodCfg := &cfg.Configs[i]
periodCfg.applyDefaults()
if err := periodCfg.validate(); err != nil {
return err
}
if i+1 < len(cfg.Configs) {
if cfg.Configs[i].From.Time.Unix() >= cfg.Configs[i+1].From.Time.Unix() {
return errSchemaIncreasingFromTime
}
}
}
return nil
}
func defaultRowShards(schema string) uint32 {
switch schema {
case "v1", "v2", "v3", "v4", "v5", "v6", "v9":
return 0
default:
return 16
}
}
// ForEachAfter will call f() on every entry after t, splitting
// entries if necessary so there is an entry starting at t
func (cfg *SchemaConfig) ForEachAfter(t model.Time, f func(config *PeriodConfig)) {
for i := 0; i < len(cfg.Configs); i++ {
if t > cfg.Configs[i].From.Time &&
(i+1 == len(cfg.Configs) || t < cfg.Configs[i+1].From.Time) {
// Split the i'th entry by duplicating then overwriting the From time
cfg.Configs = append(cfg.Configs[:i+1], cfg.Configs[i:]...)
cfg.Configs[i+1].From = DayTime{t}
}
if cfg.Configs[i].From.Time >= t {
f(&cfg.Configs[i])
}
}
}
func validateChunks(cfg PeriodConfig) error {
objectStore := cfg.IndexType
if cfg.ObjectType != "" {
objectStore = cfg.ObjectType
}
switch objectStore {
case "cassandra", "aws-dynamo", "bigtable-hashed", "gcp", "gcp-columnkey", "bigtable", "grpc-store":
if cfg.ChunkTables.Prefix == "" {
return errConfigChunkPrefixNotSet
}
return nil
default:
return nil
}
}
// CreateSchema returns the schema defined by the PeriodConfig
func (cfg PeriodConfig) CreateSchema() (BaseSchema, error) {
buckets, bucketsPeriod := cfg.createBucketsFunc()
// Ensure the tables period is a multiple of the bucket period
if cfg.IndexTables.Period > 0 && cfg.IndexTables.Period%bucketsPeriod != 0 {
return nil, errInvalidTablePeriod
}
if cfg.ChunkTables.Period > 0 && cfg.ChunkTables.Period%bucketsPeriod != 0 {
return nil, errInvalidTablePeriod
}
switch cfg.Schema {
case "v1":
return newStoreSchema(buckets, originalEntries{}), nil
case "v2":
return newStoreSchema(buckets, originalEntries{}), nil
case "v3":
return newStoreSchema(buckets, base64Entries{originalEntries{}}), nil
case "v4":
return newStoreSchema(buckets, labelNameInHashKeyEntries{}), nil
case "v5":
return newStoreSchema(buckets, v5Entries{}), nil
case "v6":
return newStoreSchema(buckets, v6Entries{}), nil
case "v9":
return newSeriesStoreSchema(buckets, v9Entries{}), nil
case "v10", "v11":
if cfg.RowShards == 0 {
return nil, fmt.Errorf("Must have row_shards > 0 (current: %d) for schema (%s)", cfg.RowShards, cfg.Schema)
}
v10 := v10Entries{rowShards: cfg.RowShards}
if cfg.Schema == "v10" {
return newSeriesStoreSchema(buckets, v10), nil
}
return newSeriesStoreSchema(buckets, v11Entries{v10}), nil
default:
return nil, errInvalidSchemaVersion
}
}
func (cfg PeriodConfig) createBucketsFunc() (schemaBucketsFunc, time.Duration) {
switch cfg.Schema {
case "v1":
return cfg.hourlyBuckets, 1 * time.Hour
default:
return cfg.dailyBuckets, 24 * time.Hour
}
}
func (cfg *PeriodConfig) applyDefaults() {
if cfg.RowShards == 0 {
cfg.RowShards = defaultRowShards(cfg.Schema)
}
}
// Validate the period config.
func (cfg PeriodConfig) validate() error {
validateError := validateChunks(cfg)
if validateError != nil {
return validateError
}
_, err := cfg.CreateSchema()
return err
}
// Load the yaml file, or build the config from legacy command-line flags
func (cfg *SchemaConfig) Load() error {
if len(cfg.Configs) > 0 {
return nil
}
// Load config from file.
if err := cfg.loadFromFile(); err != nil {
return err
}
return cfg.Validate()
}
// Bucket describes a range of time with a tableName and hashKey
type Bucket struct {
from uint32
through uint32
tableName string
hashKey string
bucketSize uint32 // helps with deletion of series ids in series store. Size in milliseconds.
}
func (cfg *PeriodConfig) hourlyBuckets(from, through model.Time, userID string) []Bucket {
var (
fromHour = from.Unix() / secondsInHour
throughHour = through.Unix() / secondsInHour
result = []Bucket{}
)
for i := fromHour; i <= throughHour; i++ {
relativeFrom := math.Max64(0, int64(from)-(i*millisecondsInHour))
relativeThrough := math.Min64(millisecondsInHour, int64(through)-(i*millisecondsInHour))
result = append(result, Bucket{
from: uint32(relativeFrom),
through: uint32(relativeThrough),
tableName: cfg.IndexTables.TableFor(model.TimeFromUnix(i * secondsInHour)),
hashKey: fmt.Sprintf("%s:%d", userID, i),
bucketSize: uint32(millisecondsInHour), // helps with deletion of series ids in series store
})
}
return result
}
func (cfg *PeriodConfig) dailyBuckets(from, through model.Time, userID string) []Bucket {
var (
fromDay = from.Unix() / secondsInDay
throughDay = through.Unix() / secondsInDay
result = []Bucket{}
)
for i := fromDay; i <= throughDay; i++ {
// The idea here is that the hash key contains the bucket start time (rounded to
// the nearest day). The range key can contain the offset from that, to the
// (start/end) of the chunk. For chunks that span multiple buckets, these
// offsets will be capped to the bucket boundaries, i.e. start will be
// positive in the first bucket, then zero in the next etc.
//
// The reason for doing all this is to reduce the size of the time stamps we
// include in the range keys - we use a uint32 - as we then have to base 32
// encode it.
relativeFrom := math.Max64(0, int64(from)-(i*millisecondsInDay))
relativeThrough := math.Min64(millisecondsInDay, int64(through)-(i*millisecondsInDay))
result = append(result, Bucket{
from: uint32(relativeFrom),
through: uint32(relativeThrough),
tableName: cfg.IndexTables.TableFor(model.TimeFromUnix(i * secondsInDay)),
hashKey: fmt.Sprintf("%s:d%d", userID, i),
bucketSize: uint32(millisecondsInDay), // helps with deletion of series ids in series store
})
}
return result
}
// PeriodicTableConfig is configuration for a set of time-sharded tables.
type PeriodicTableConfig struct {
Prefix string
Period time.Duration
Tags Tags
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (cfg *PeriodicTableConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
g := struct {
Prefix string `yaml:"prefix"`
Period model.Duration `yaml:"period"`
Tags Tags `yaml:"tags"`
}{}
if err := unmarshal(&g); err != nil {
return err
}
cfg.Prefix = g.Prefix
cfg.Period = time.Duration(g.Period)
cfg.Tags = g.Tags
return nil
}
// MarshalYAML implements the yaml.Marshaler interface.
func (cfg PeriodicTableConfig) MarshalYAML() (interface{}, error) {
g := &struct {
Prefix string `yaml:"prefix"`
Period model.Duration `yaml:"period"`
Tags Tags `yaml:"tags"`
}{
Prefix: cfg.Prefix,
Period: model.Duration(cfg.Period),
Tags: cfg.Tags,
}
return g, nil
}
// AutoScalingConfig for DynamoDB tables.
type AutoScalingConfig struct {
Enabled bool `yaml:"enabled"`
RoleARN string `yaml:"role_arn"`
MinCapacity int64 `yaml:"min_capacity"`
MaxCapacity int64 `yaml:"max_capacity"`
OutCooldown int64 `yaml:"out_cooldown"`
InCooldown int64 `yaml:"in_cooldown"`
TargetValue float64 `yaml:"target"`
}
// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *AutoScalingConfig) RegisterFlags(argPrefix string, f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, argPrefix+".enabled", false, "Should we enable autoscale for the table.")
f.StringVar(&cfg.RoleARN, argPrefix+".role-arn", "", "AWS AutoScaling role ARN")
f.Int64Var(&cfg.MinCapacity, argPrefix+".min-capacity", 3000, "DynamoDB minimum provision capacity.")
f.Int64Var(&cfg.MaxCapacity, argPrefix+".max-capacity", 6000, "DynamoDB maximum provision capacity.")
f.Int64Var(&cfg.OutCooldown, argPrefix+".out-cooldown", 1800, "DynamoDB minimum seconds between each autoscale up.")
f.Int64Var(&cfg.InCooldown, argPrefix+".in-cooldown", 1800, "DynamoDB minimum seconds between each autoscale down.")
f.Float64Var(&cfg.TargetValue, argPrefix+".target-value", 80, "DynamoDB target ratio of consumed capacity to provisioned capacity.")
}
func (cfg *PeriodicTableConfig) periodicTables(from, through model.Time, pCfg ProvisionConfig, beginGrace, endGrace time.Duration, retention time.Duration) []TableDesc {
var (
periodSecs = int64(cfg.Period / time.Second)
beginGraceSecs = int64(beginGrace / time.Second)
endGraceSecs = int64(endGrace / time.Second)
firstTable = from.Unix() / periodSecs
lastTable = through.Unix() / periodSecs
tablesToKeep = int64(retention/time.Second) / periodSecs
now = mtime.Now().Unix()
nowWeek = now / periodSecs
result = []TableDesc{}
)
// If interval ends exactly on a period boundary, don’t include the upcoming period
if through.Unix()%periodSecs == 0 {
lastTable--
}
// Don't make tables further back than the configured retention
if retention > 0 && lastTable > tablesToKeep && lastTable-firstTable >= tablesToKeep {
firstTable = lastTable - tablesToKeep
}
for i := firstTable; i <= lastTable; i++ {
tableName := cfg.tableForPeriod(i)
table := TableDesc{}
// if now is within table [start - grace, end + grace), then we need some write throughput
if (i*periodSecs)-beginGraceSecs <= now && now < (i*periodSecs)+periodSecs+endGraceSecs {
table = pCfg.ActiveTableProvisionConfig.BuildTableDesc(tableName, cfg.Tags)
level.Debug(log.Logger).Log("msg", "Table is Active",
"tableName", table.Name,
"provisionedRead", table.ProvisionedRead,
"provisionedWrite", table.ProvisionedWrite,
"useOnDemandMode", table.UseOnDemandIOMode,
"useWriteAutoScale", table.WriteScale.Enabled,
"useReadAutoScale", table.ReadScale.Enabled)
} else {
// Autoscale last N tables
// this is measured against "now", since the lastWeek is the final week in the schema config range
// the N last tables in that range will always be set to the inactive scaling settings.
disableAutoscale := i < (nowWeek - pCfg.InactiveWriteScaleLastN)
table = pCfg.InactiveTableProvisionConfig.BuildTableDesc(tableName, cfg.Tags, disableAutoscale)
level.Debug(log.Logger).Log("msg", "Table is Inactive",
"tableName", table.Name,
"provisionedRead", table.ProvisionedRead,
"provisionedWrite", table.ProvisionedWrite,
"useOnDemandMode", table.UseOnDemandIOMode,
"useWriteAutoScale", table.WriteScale.Enabled,
"useReadAutoScale", table.ReadScale.Enabled)
}
result = append(result, table)
}
return result
}
// ChunkTableFor calculates the chunk table shard for a given point in time.
func (cfg SchemaConfig) ChunkTableFor(t model.Time) (string, error) {
for i := range cfg.Configs {
if t >= cfg.Configs[i].From.Time && (i+1 == len(cfg.Configs) || t < cfg.Configs[i+1].From.Time) {
return cfg.Configs[i].ChunkTables.TableFor(t), nil
}
}
return "", fmt.Errorf("no chunk table found for time %v", t)
}
// TableFor calculates the table shard for a given point in time.
func (cfg *PeriodicTableConfig) TableFor(t model.Time) string {
if cfg.Period == 0 { // non-periodic
return cfg.Prefix
}
periodSecs := int64(cfg.Period / time.Second)
return cfg.tableForPeriod(t.Unix() / periodSecs)
}
func (cfg *PeriodicTableConfig) tableForPeriod(i int64) string {
return cfg.Prefix + strconv.Itoa(int(i))
}