Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

increase db retain period in ingesters to cover index cache validity period as well #3300

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 33 additions & 15 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/grafana/loki/pkg/ruler"
loki_storage "github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/stores/shipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/uploads"
serverutil "github.com/grafana/loki/pkg/util/server"
"github.com/grafana/loki/pkg/util/validation"
)
Expand Down Expand Up @@ -297,11 +298,13 @@ func (t *Loki) initStore() (_ services.Service, err error) {
Validity: t.cfg.StorageConfig.IndexCacheValidity - 1*time.Minute,
},
}
t.cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.cfg) + 2*time.Minute
case Querier, Ruler:
// We do not want query to do any updates to index
t.cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
default:
t.cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadWrite
t.cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.cfg) + 2*time.Minute
}
}

Expand All @@ -311,13 +314,13 @@ func (t *Loki) initStore() (_ services.Service, err error) {
}

if loki_storage.UsingBoltdbShipper(t.cfg.SchemaConfig.Configs) {
boltdbShipperMinIngesterQueryStoreDuration := boltdbShipperMinIngesterQueryStoreDuration(t.cfg)
switch t.cfg.Target {
case Querier, Ruler:
// Use AsyncStore to query both ingesters local store and chunk store for store queries.
// Only queriers should use the AsyncStore, it should never be used in ingesters.
chunkStore = loki_storage.NewAsyncStore(chunkStore, t.ingesterQuerier,
calculateAsyncStoreQueryIngestersWithin(t.cfg.Querier.QueryIngestersWithin,
t.cfg.Ingester.MaxChunkAge, t.cfg.StorageConfig.BoltDBShipperConfig.ResyncInterval),
calculateAsyncStoreQueryIngestersWithin(t.cfg.Querier.QueryIngestersWithin, boltdbShipperMinIngesterQueryStoreDuration),
)
case All:
// We want ingester to also query the store when using boltdb-shipper but only when running with target All.
Expand All @@ -329,7 +332,7 @@ func (t *Loki) initStore() (_ services.Service, err error) {
boltdbShipperConfigIdx++
}
mlb, err := calculateMaxLookBack(t.cfg.SchemaConfig.Configs[boltdbShipperConfigIdx], t.cfg.Ingester.QueryStoreMaxLookBackPeriod,
t.cfg.Ingester.MaxChunkAge, t.cfg.StorageConfig.BoltDBShipperConfig.ResyncInterval)
boltdbShipperMinIngesterQueryStoreDuration)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -566,34 +569,49 @@ func (t *Loki) initCompactor() (services.Service, error) {
return t.compactor, nil
}

func calculateMaxLookBack(pc chunk.PeriodConfig, maxLookBackConfig, maxChunkAge, querierResyncInterval time.Duration) (time.Duration, error) {
func calculateMaxLookBack(pc chunk.PeriodConfig, maxLookBackConfig, minDuration time.Duration) (time.Duration, error) {
if pc.ObjectType != shipper.FilesystemObjectStoreType && maxLookBackConfig.Nanoseconds() != 0 {
return 0, errors.New("it is an error to specify a non zero `query_store_max_look_back_period` value when using any object store other than `filesystem`")
}
// When using shipper, limit max look back for query to MaxChunkAge + upload interval by shipper + 15 mins to query only data whose index is not pushed yet
defaultMaxLookBack := maxChunkAge + shipper.UploadInterval + querierResyncInterval + (15 * time.Minute)

if maxLookBackConfig == 0 {
// If the QueryStoreMaxLookBackPeriod is still it's default value of 0, set it to the default calculated value.
return defaultMaxLookBack, nil
} else if maxLookBackConfig > 0 && maxLookBackConfig < defaultMaxLookBack {
// If the QueryStoreMaxLookBackPeriod is > 0 (-1 is allowed for infinite), make sure it's at least greater than the default or throw an error
// If the QueryStoreMaxLookBackPeriod is still it's default value of 0, set it to the minDuration.
return minDuration, nil
} else if maxLookBackConfig > 0 && maxLookBackConfig < minDuration {
// If the QueryStoreMaxLookBackPeriod is > 0 (-1 is allowed for infinite), make sure it's at least greater than minDuration or throw an error
return 0, fmt.Errorf("the configured query_store_max_look_back_period of '%v' is less than the calculated default of '%v' "+
"which is calculated based on the max_chunk_age + 15 minute boltdb-shipper interval + 15 min additional buffer. Increase this value"+
"greater than the default or remove it from the configuration to use the default", maxLookBackConfig, defaultMaxLookBack)
"greater than the default or remove it from the configuration to use the default", maxLookBackConfig, minDuration)
}
return maxLookBackConfig, nil
}

func calculateAsyncStoreQueryIngestersWithin(queryIngestersWithinConfig, maxChunkAge, querierResyncInterval time.Duration) time.Duration {
func calculateAsyncStoreQueryIngestersWithin(queryIngestersWithinConfig, minDuration time.Duration) time.Duration {
// 0 means do not limit queries, we would also not limit ingester queries from AsyncStore.
if queryIngestersWithinConfig == 0 {
return 0
}

minVal := maxChunkAge + shipper.UploadInterval + querierResyncInterval + (15 * time.Minute)
if queryIngestersWithinConfig < minVal {
return minVal
if queryIngestersWithinConfig < minDuration {
return minDuration
}
return queryIngestersWithinConfig
}

// boltdbShipperQuerierIndexUpdateDelay returns duration it could take for queriers to serve the index since it was uploaded.
// It also considers index cache validity because a querier could have cached index just before it was going to resync which means
// it would keep serving index until the cache entries expire.
func boltdbShipperQuerierIndexUpdateDelay(cfg Config) time.Duration {
return cfg.StorageConfig.IndexCacheValidity + cfg.StorageConfig.BoltDBShipperConfig.ResyncInterval
}

// boltdbShipperIngesterIndexUploadDelay returns duration it could take for an index file containing id of a chunk to be uploaded to the shared store since it got flushed.
func boltdbShipperIngesterIndexUploadDelay() time.Duration {
return uploads.ShardDBsByDuration + shipper.UploadInterval
}

// boltdbShipperMinIngesterQueryStoreDuration returns minimum duration(with some buffer) ingesters should query their stores to
// avoid missing any logs or chunk ids due to async nature of BoltDB Shipper.
func boltdbShipperMinIngesterQueryStoreDuration(cfg Config) time.Duration {
return cfg.Ingester.MaxChunkAge + boltdbShipperIngesterIndexUploadDelay() + boltdbShipperQuerierIndexUpdateDelay(cfg) + 2*time.Minute
}
83 changes: 14 additions & 69 deletions pkg/loki/modules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ import (

func Test_calculateMaxLookBack(t *testing.T) {
type args struct {
pc chunk.PeriodConfig
maxLookBackConfig time.Duration
maxChunkAge time.Duration
querierBoltDBFilesResyncInterval time.Duration
pc chunk.PeriodConfig
maxLookBackConfig time.Duration
minDuration time.Duration
}
tests := []struct {
name string
Expand All @@ -26,11 +25,10 @@ func Test_calculateMaxLookBack(t *testing.T) {
pc: chunk.PeriodConfig{
ObjectType: "filesystem",
},
maxLookBackConfig: 0,
maxChunkAge: 1 * time.Hour,
querierBoltDBFilesResyncInterval: 5 * time.Minute,
maxLookBackConfig: 0,
minDuration: time.Hour,
},
want: 81 * time.Minute,
want: time.Hour,
wantErr: false,
},
{
Expand All @@ -39,9 +37,8 @@ func Test_calculateMaxLookBack(t *testing.T) {
pc: chunk.PeriodConfig{
ObjectType: "filesystem",
},
maxLookBackConfig: -1,
maxChunkAge: 1 * time.Hour,
querierBoltDBFilesResyncInterval: 5 * time.Minute,
maxLookBackConfig: -1,
minDuration: time.Hour,
},
want: -1,
wantErr: false,
Expand All @@ -52,30 +49,28 @@ func Test_calculateMaxLookBack(t *testing.T) {
pc: chunk.PeriodConfig{
ObjectType: "gcs",
},
maxLookBackConfig: -1,
maxChunkAge: 1 * time.Hour,
querierBoltDBFilesResyncInterval: 5 * time.Minute,
maxLookBackConfig: -1,
minDuration: time.Hour,
},
want: 0,
wantErr: true,
},
{
name: "less than default",
name: "less than minDuration",
args: args{
pc: chunk.PeriodConfig{
ObjectType: "filesystem",
},
maxLookBackConfig: 1 * time.Hour,
maxChunkAge: 1 * time.Hour,
querierBoltDBFilesResyncInterval: 5 * time.Minute,
maxLookBackConfig: 1 * time.Hour,
minDuration: 2 * time.Hour,
},
want: 0,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := calculateMaxLookBack(tt.args.pc, tt.args.maxLookBackConfig, tt.args.maxChunkAge, tt.args.querierBoltDBFilesResyncInterval)
got, err := calculateMaxLookBack(tt.args.pc, tt.args.maxLookBackConfig, tt.args.minDuration)
if (err != nil) != tt.wantErr {
t.Errorf("calculateMaxLookBack() error = %v, wantErr %v", err, tt.wantErr)
return
Expand All @@ -86,53 +81,3 @@ func Test_calculateMaxLookBack(t *testing.T) {
})
}
}

func Test_calculateAsyncStoreQueryIngestersWithin(t *testing.T) {
type args struct {
queryIngestersWithin time.Duration
maxChunkAge time.Duration
querierBoltDBFilesResyncInterval time.Duration
}
tests := []struct {
name string
args args
want time.Duration
}{
{
name: "default",
args: args{
0,
time.Hour,
5 * time.Minute,
},
want: 0,
},
{
name: "queryIngestersWithin more than min val",
args: args{
3 * time.Hour,
time.Hour,
5 * time.Minute,
},
want: 3 * time.Hour,
},
{
name: "queryIngestersWithin less than min val",
args: args{
time.Hour,
time.Hour,
5 * time.Minute,
},
want: 81 * time.Minute,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := calculateAsyncStoreQueryIngestersWithin(tt.args.queryIngestersWithin, tt.args.maxChunkAge, tt.args.querierBoltDBFilesResyncInterval)

if got != tt.want {
t.Errorf("calculateAsyncStoreQueryIngestersWithin() got = %v, want %v", got, tt.want)
}
})
}
}
19 changes: 10 additions & 9 deletions pkg/storage/stores/shipper/shipper_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,15 @@ type boltDBIndexClient interface {
}

type Config struct {
ActiveIndexDirectory string `yaml:"active_index_directory"`
SharedStoreType string `yaml:"shared_store"`
CacheLocation string `yaml:"cache_location"`
CacheTTL time.Duration `yaml:"cache_ttl"`
ResyncInterval time.Duration `yaml:"resync_interval"`
QueryReadyNumDays int `yaml:"query_ready_num_days"`
IngesterName string `yaml:"-"`
Mode int `yaml:"-"`
ActiveIndexDirectory string `yaml:"active_index_directory"`
SharedStoreType string `yaml:"shared_store"`
CacheLocation string `yaml:"cache_location"`
CacheTTL time.Duration `yaml:"cache_ttl"`
ResyncInterval time.Duration `yaml:"resync_interval"`
QueryReadyNumDays int `yaml:"query_ready_num_days"`
IngesterName string `yaml:"-"`
Mode int `yaml:"-"`
IngesterDBRetainPeriod time.Duration `yaml:"-"`
}

// RegisterFlags registers flags.
Expand Down Expand Up @@ -127,7 +128,7 @@ func (s *Shipper) init(storageClient chunk.ObjectClient, registerer prometheus.R
Uploader: uploader,
IndexDir: s.cfg.ActiveIndexDirectory,
UploadInterval: UploadInterval,
DBRetainPeriod: s.cfg.ResyncInterval + 2*time.Minute,
DBRetainPeriod: s.cfg.IngesterDBRetainPeriod,
}
uploadsManager, err := uploads.NewTableManager(cfg, s.boltDBIndexClient, prefixedObjectClient, registerer)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/stores/shipper/uploads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

const (
// create a new db sharded by time based on when write request is received
shardDBsByDuration = 15 * time.Minute
ShardDBsByDuration = 15 * time.Minute

// a temp file is created during uploads with name of the db + tempFileSuffix
tempFileSuffix = ".temp"
Expand Down Expand Up @@ -245,12 +245,12 @@ func (lt *Table) Write(ctx context.Context, writes local.TableWrites) error {
return lt.write(ctx, time.Now(), writes)
}

// write writes to a db locally. It shards the db files by truncating the passed time by shardDBsByDuration using https://golang.org/pkg/time/#Time.Truncate
// write writes to a db locally. It shards the db files by truncating the passed time by ShardDBsByDuration using https://golang.org/pkg/time/#Time.Truncate
// db files are named after the time shard i.e epoch of the truncated time.
// If a db file does not exist for a shard it gets created.
func (lt *Table) write(ctx context.Context, tm time.Time, writes local.TableWrites) error {
// do not write to files older than init time otherwise we might endup modifying file which was already created and uploaded before last shutdown.
shard := tm.Truncate(shardDBsByDuration).Unix()
shard := tm.Truncate(ShardDBsByDuration).Unix()
if shard < lt.modifyShardsSince {
shard = lt.modifyShardsSince
}
Expand Down Expand Up @@ -505,5 +505,5 @@ func loadBoltDBsFromDir(dir string) (map[string]*bbolt.DB, error) {
func getOldestActiveShardTime() time.Time {
// upload files excluding active shard. It could so happen that we just started a new shard but the file for last shard is still being updated due to pending writes or pending flush to disk.
// To avoid uploading it, excluding previous active shard as well if it has been not more than a minute since it became inactive.
return time.Now().Add(-time.Minute).Truncate(shardDBsByDuration)
return time.Now().Add(-time.Minute).Truncate(ShardDBsByDuration)
}
16 changes: 8 additions & 8 deletions pkg/storage/stores/shipper/uploads/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestTable_Write(t *testing.T) {
now := time.Now()

// allow modifying last 5 shards
table.modifyShardsSince = now.Add(-5 * shardDBsByDuration).Unix()
table.modifyShardsSince = now.Add(-5 * ShardDBsByDuration).Unix()

// a couple of times for which we want to do writes to make the table create different shards
testCases := []struct {
Expand All @@ -120,13 +120,13 @@ func TestTable_Write(t *testing.T) {
writeTime: now,
},
{
writeTime: now.Add(-(shardDBsByDuration + 5*time.Minute)),
writeTime: now.Add(-(ShardDBsByDuration + 5*time.Minute)),
},
{
writeTime: now.Add(-(shardDBsByDuration*3 + 3*time.Minute)),
writeTime: now.Add(-(ShardDBsByDuration*3 + 3*time.Minute)),
},
{
writeTime: now.Add(-6 * shardDBsByDuration), // write with time older than table.modifyShardsSince
writeTime: now.Add(-6 * ShardDBsByDuration), // write with time older than table.modifyShardsSince
dbName: fmt.Sprint(table.modifyShardsSince),
},
}
Expand All @@ -145,7 +145,7 @@ func TestTable_Write(t *testing.T) {

expectedDBName := tc.dbName
if expectedDBName == "" {
expectedDBName = fmt.Sprint(tc.writeTime.Truncate(shardDBsByDuration).Unix())
expectedDBName = fmt.Sprint(tc.writeTime.Truncate(ShardDBsByDuration).Unix())
}
db, ok := table.dbs[expectedDBName]
require.True(t, ok)
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestTable_Upload(t *testing.T) {
// write a batch to another shard
batch = boltIndexClient.NewWriteBatch()
testutil.AddRecordsToBatch(batch, "test", 20, 10)
require.NoError(t, table.write(context.Background(), now.Add(shardDBsByDuration), batch.(*local.BoltWriteBatch).Writes["test"]))
require.NoError(t, table.write(context.Background(), now.Add(ShardDBsByDuration), batch.(*local.BoltWriteBatch).Writes["test"]))

// upload the dbs to storage
require.NoError(t, table.Upload(context.Background(), true))
Expand Down Expand Up @@ -382,9 +382,9 @@ func TestTable_ImmutableUploads(t *testing.T) {

// some dbs to setup
dbNames := []int64{
shardCutoff.Add(-shardDBsByDuration).Unix(), // inactive shard, should upload
shardCutoff.Add(-ShardDBsByDuration).Unix(), // inactive shard, should upload
shardCutoff.Add(-1 * time.Minute).Unix(), // 1 minute before shard cutoff, should upload
time.Now().Truncate(shardDBsByDuration).Unix(), // active shard, should not upload
time.Now().Truncate(ShardDBsByDuration).Unix(), // active shard, should not upload
}

dbs := map[string]testutil.DBRecords{}
Expand Down