Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loki: Allow configuring query_store_max_look_back_period when running a filesystem store and boltdb-shipper #2073

Merged
merged 7 commits into from
May 14, 2020
7 changes: 7 additions & 0 deletions docs/configuration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,13 @@ The `ingester_config` block configures Ingesters.
# The maximum duration of a timeseries chunk in memory. If a timeseries runs for longer than this the current chunk will be flushed to the store and a new chunk created.
[max_chunk_age: <duration> | default = 1h]

# How far in the past an ingester is allowed to query the store for data.
# This is only useful for running multiple loki binaries with a shared ring with a `filesystem` store which is NOT shared between the binaries
# When using any "shared" object store like S3 or GCS this value must always be left as 0
# It is an error to configure this to a non-zero value when using any object store other than `filesystem`
# Use a value of -1 to allow the ingester to query the store infinitely far back in time.
[query_store_max_look_back_period: <duration> | default = 0]

```
### lifecycler_config
Expand Down
16 changes: 16 additions & 0 deletions docs/operations/storage/boltdb-shipper.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,21 @@ Frequency for checking updates can be configured with `resync_interval` config.
To avoid keeping downloaded index files forever there is a ttl for them which defaults to 24 hours, which means if index files for a period are not used for 24 hours they would be removed from cache location.
ttl can be configured using `cache_ttl` config.

## Horizontal scaling of non-shared filesystem stores

Using the boltdb-shipper also allows running the single binary Loki (or really just the ingesters) with a `filesystem` object store also using a shared hash ring.

If you configure a shared ring via etcd/consul/memberlist you can run multiple instances of Loki on separate machines with separate filesystems,
this now works because the ingesters are able to query the store directly.

To enable this configuration in the ingester config you must set `query_store_max_look_back_period` according to how far back you want to store data, or use a value of -1 for infinite.

At query time, any Loki instance can field the query, the instance will then use the ring to ask every other Loki instance for relevant data,
and because the ingesters can each query their store as far back as `query_store_max_look_back_period` allows, the correct data can be returned.

Scaling up is as easy as adding more loki instances and letting them talk to the same ring.

Scaling down is possible but manual, you would need to shutdown the loki instance and then physically copy the chunks directory and its index files in their entirety to another Loki instance.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the other instance would also have index files with same name so it would not be just copying of files. I guess we would have to provide a tool to merge boltdb files. We will have to make it clear in the docs until we have that tool.
What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each ingester should be writing files with a unique name such that you should be able to copy ones file to another without conflicts

uploader := fmt.Sprintf("%s-%d", s.cfg.IngesterName, time.Now().Unix())

However, I think I may have found a bug here, this is what I see for my current test ingesters, it looks like the lifecycler ID was empty when they created their files

ed@ed-VirtualBox:/tmp$ ls loki1/chunks/index/index_2628
-1589422114
ed@ed-VirtualBox:/tmp$ ls loki2/chunks/index/index_2628
-1589422113

These still wouldn't collide because the timestamps were different but they should also have the unique lifecycler ID in front

My suspicion here is they created the files before joining the ring? Can you take a look at this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at how this works, there is a real chicken and egg problem here when initializing the shipper and trying to get the ingester ID to use in the name file.

I'm not sure it's possible to do this, but maybe you have some ideas?

I'm wondering instead if we should try a different strategy and use the hostname or allow for the name to be configured in the yaml directly?

You cannot move them in partial it must be all, this other Loki instance will then find the boltdb index files and serve the chunks copied.
slim-bean marked this conversation as resolved.
Show resolved Hide resolved


5 changes: 3 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type Config struct {
ingesterClientFactory func(cfg client.Config, addr string) (client.HealthAndIngesterClient, error)

QueryStore bool `yaml:"-"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can maybe get rid of this because now we mostly rely on QueryStoreMaxLookBackPeriod to enable/disable querying the store from ingesters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems correct what @sandeepsukhani is saying ?

QueryStoreMaxLookBackPeriod time.Duration `yaml:"-"`
QueryStoreMaxLookBackPeriod time.Duration `yaml:"query_store_max_look_back_period"`
}

// RegisterFlags registers the flags.
Expand All @@ -84,6 +84,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.Float64Var(&cfg.SyncMinUtilization, "ingester.sync-min-utilization", 0, "Minimum utilization of chunk when doing synchronization.")
f.IntVar(&cfg.MaxReturnedErrors, "ingester.max-ignored-stream-errors", 10, "Maximum number of ignored stream errors to return. 0 to return all errors.")
f.DurationVar(&cfg.MaxChunkAge, "ingester.max-chunk-age", time.Hour, "Maximum chunk age before flushing.")
f.DurationVar(&cfg.QueryStoreMaxLookBackPeriod, "ingester.query-store-max-look-back-period", 0, "How far back should an ingester be allowed to query the store for data, for use only with boltdb-shipper index and filesystem object store. -1 for infinite.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the requirement should be boltdb(not boltdb_shipper) as index store and filesystem as object store because there is no use of having boltdb_shipper without shared object store otherwise you would be copying files somewhere on the same filesystem which is essentially same as not using boltdb_shipper at all. I might be wrong here or might be missing your point. What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If my above assumption is true, that the file names are unique, I specified boltdb_shipper so that you could copy the files between ingesters to allow scaling down, otherwise what you said is true that there isn't much reason to use boltdb_shipper for this rather than boltdb.

With the exception of one other reason I could think of, it would be easier to take backups or even move the Loki to another host because the index files can be directly copied with the chunks directory while Loki is still running, I don't think this is true for boltdb I don't think you can safely copy an open boltdb file.

}

// Ingester builds chunks for incoming log streams.
Expand Down Expand Up @@ -414,7 +415,7 @@ func buildStoreRequest(cfg Config, req *logproto.QueryRequest) *logproto.QueryRe
}
start := req.Start
end := req.End
if cfg.QueryStoreMaxLookBackPeriod != 0 {
if cfg.QueryStoreMaxLookBackPeriod > 0 {
oldestStartTime := time.Now().Add(-cfg.QueryStoreMaxLookBackPeriod)
if oldestStartTime.After(req.Start) {
start = oldestStartTime
Expand Down
39 changes: 32 additions & 7 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package loki

import (
"errors"
"fmt"
"net/http"
"os"
Expand Down Expand Up @@ -198,10 +199,14 @@ func (t *Loki) initIngester() (_ services.Service, err error) {
t.cfg.Ingester.LifecyclerConfig.ListenPort = t.cfg.Server.GRPCListenPort

// We want ingester to also query the store when using boltdb-shipper
if activeIndexType(t.cfg.SchemaConfig) == local.BoltDBShipperType {
pc := activePeriodConfig(t.cfg.SchemaConfig)
if pc.IndexType == local.BoltDBShipperType {
t.cfg.Ingester.QueryStore = true
// When using shipper, limit max look back for query to MaxChunkAge + upload interval by shipper + 15 mins to query only data whose index is not pushed yet
t.cfg.Ingester.QueryStoreMaxLookBackPeriod = t.cfg.Ingester.MaxChunkAge + local.ShipperFileUploadInterval + (15 * time.Minute)
mlb, err := calculateMaxLookBack(pc, t.cfg.Ingester.QueryStoreMaxLookBackPeriod, t.cfg.Ingester.MaxChunkAge)
if err != nil {
return nil, err
}
t.cfg.Ingester.QueryStoreMaxLookBackPeriod = mlb
}

t.ingester, err = ingester.New(t.cfg.Ingester, t.cfg.IngesterClient, t.store, t.overrides)
Expand Down Expand Up @@ -256,7 +261,7 @@ func (t *Loki) initTableManager() (services.Service, error) {
}

func (t *Loki) initStore() (_ services.Service, err error) {
if activeIndexType(t.cfg.SchemaConfig) == local.BoltDBShipperType {
if activePeriodConfig(t.cfg.SchemaConfig).IndexType == local.BoltDBShipperType {
t.cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.cfg.Ingester.LifecyclerConfig.ID
switch t.cfg.Target {
case Ingester:
Expand Down Expand Up @@ -483,15 +488,35 @@ var modules = map[moduleName]module{
},
}

// activeIndexType type returns index type which would be applicable to logs that would be pushed starting now
// activePeriodConfig type returns index type which would be applicable to logs that would be pushed starting now
// Note: Another periodic config can be applicable in future which can change index type
func activeIndexType(cfg chunk.SchemaConfig) string {
func activePeriodConfig(cfg chunk.SchemaConfig) chunk.PeriodConfig {
now := model.Now()
i := sort.Search(len(cfg.Configs), func(i int) bool {
return cfg.Configs[i].From.Time > now
})
if i > 0 {
i--
}
return cfg.Configs[i].IndexType
return cfg.Configs[i]
}

func calculateMaxLookBack(pc chunk.PeriodConfig, maxLookBackConfig, maxChunkAge time.Duration) (time.Duration, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That deserve a test and it seems simple.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol, yeah, i thought i put in the description or somewhere I wanted to test this but hadn't done it yet

if pc.ObjectType != local.FilesystemObjectStoreType && maxLookBackConfig.Milliseconds() != 0 {
return 0, errors.New("it is an error to specify a non zero `query_store_max_look_back_period` value when using any object store other than `filesystem`")
}
// When using shipper, limit max look back for query to MaxChunkAge + upload interval by shipper + 15 mins to query only data whose index is not pushed yet
defaultMaxLookBack := maxChunkAge + local.ShipperFileUploadInterval + (15 * time.Minute)

if maxLookBackConfig == 0 {
// If the QueryStoreMaxLookBackPeriod is still it's default value of 0, set it to the default calculated value.
return defaultMaxLookBack, nil
} else if maxLookBackConfig > 0 && maxLookBackConfig < defaultMaxLookBack {
// If the QueryStoreMaxLookBackPeriod is > 0 (-1 is allowed for infinite), make sure it's at least greater than the default or throw an error
return 0, fmt.Errorf("the configured query_store_max_look_back_period of '%v' is less than the calculated default of '%v' "+
"which is calculated based on the max_chunk_age + 15 minute boltdb-shipper interval + 15 min additional buffer. Increase this value"+
"greater than the default or remove it from the configuration to use the default", maxLookBackConfig, defaultMaxLookBack)

}
return maxLookBackConfig, nil
}
6 changes: 3 additions & 3 deletions pkg/loki/modules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,20 @@ func TestActiveIndexType(t *testing.T) {
IndexType: "first",
}}

assert.Equal(t, "first", activeIndexType(cfg))
assert.Equal(t, cfg.Configs[0], activePeriodConfig(cfg))

// add a newer PeriodConfig in the past which should be considered
cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{
From: chunk.DayTime{Time: model.Now().Add(-12 * time.Hour)},
IndexType: "second",
})
assert.Equal(t, "second", activeIndexType(cfg))
assert.Equal(t, cfg.Configs[1], activePeriodConfig(cfg))

// add a newer PeriodConfig in the future which should not be considered
cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{
From: chunk.DayTime{Time: model.Now().Add(time.Hour)},
IndexType: "third",
})
assert.Equal(t, "second", activeIndexType(cfg))
assert.Equal(t, cfg.Configs[1], activePeriodConfig(cfg))

}
5 changes: 4 additions & 1 deletion pkg/storage/stores/local/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ const (
// BoltDBShipperType holds the index type for using boltdb with shipper which keeps flushing them to a shared storage
BoltDBShipperType = "boltdb-shipper"

// FilesystemObjectStoreType holds the periodic config type for the filesystem store
FilesystemObjectStoreType = "filesystem"

cacheCleanupInterval = 24 * time.Hour
storageKeyPrefix = "index/"
)
Expand Down Expand Up @@ -128,7 +131,7 @@ func NewShipper(cfg ShipperConfig, storageClient chunk.ObjectClient, boltDBGette
// avoid uploading same files again with different name. If the filed does not exist we would create one with uploader name set to
// ingester name and startup timestamp so that we randomise the name and do not override files from other ingesters.
func (s *Shipper) getUploaderName() (string, error) {
uploader := fmt.Sprintf("%s-%d", s.cfg.IngesterName, time.Now().Unix())
uploader := fmt.Sprintf("%s-%d", s.cfg.IngesterName, time.Now().UnixNano())

uploaderFilePath := path.Join(s.cfg.ActiveIndexDirectory, "uploader", "name")
if err := chunk_util.EnsureDirectory(path.Dir(uploaderFilePath)); err != nil {
Expand Down