Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loki: Allow configuring query_store_max_look_back_period when running a filesystem store and boltdb-shipper #2073

Merged
merged 7 commits into from
May 14, 2020
7 changes: 7 additions & 0 deletions docs/configuration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,13 @@ The `ingester_config` block configures Ingesters.
# The maximum duration of a timeseries chunk in memory. If a timeseries runs for longer than this the current chunk will be flushed to the store and a new chunk created.
[max_chunk_age: <duration> | default = 1h]

# How far in the past an ingester is allowed to query the store for data.
# This is only useful for running multiple loki binaries with a shared ring with a `filesystem` store which is NOT shared between the binaries
# When using any "shared" object store like S3 or GCS this value must always be left as 0
# It is an error to configure this to a non-zero value when using any object store other than `filesystem`
# Use a value of -1 to allow the ingester to query the store infinitely far back in time.
[query_store_max_look_back_period: <duration> | default = 0]

```

### lifecycler_config
Expand Down
3 changes: 2 additions & 1 deletion docs/operations/storage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The following are supported for the index:
* [Google Bigtable](https://cloud.google.com/bigtable)
* [Apache Cassandra](https://cassandra.apache.org)
* [BoltDB](https://github.com/boltdb/bolt) (doesn't work when clustering Loki)
* [Boltb-Shipper](boltdb-shipper.md) EXPERIMENTAL index store which stores boltdb index files in the object store

The following are supported for the chunks:

Expand All @@ -34,7 +35,7 @@ The following are supported for the chunks:
* [Apache Cassandra](https://cassandra.apache.org)
* [Amazon S3](https://aws.amazon.com/s3)
* [Google Cloud Storage](https://cloud.google.com/storage/)
* Filesystem (doesn't work when clustering Loki)
* [Filesystem](filesystem.md) (please read more about the filesystem to understand the pros/cons before using with production data)

## Cloud Storage Permissions

Expand Down
141 changes: 141 additions & 0 deletions docs/operations/storage/filesystem.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Filesystem Object Store

The filesystem object store is the easiest to get started with Loki but there are some pros/cons to this approach.

Very simply it stores all the objects (chunks) in the specified directory:

```yaml
storage_config:
filesystem:
directory: /tmp/loki/
```

A folder is created for every tenant all the chunks for one tenant are stored in that directory.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
A folder is created for every tenant all the chunks for one tenant are stored in that directory.
A folder is created for every tenant and all the chunks for one tenant are stored in that directory.


If loki is run in single-tenant mode, all the chunks are put in a folder named `fake` which is the synthesized tenant name used for single tenant mode.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
If loki is run in single-tenant mode, all the chunks are put in a folder named `fake` which is the synthesized tenant name used for single tenant mode.
If loki is run in single-tenant mode, all the chunks are put in a folder named `fake` which is the placeholder tenant name used for single tenant mode.


See [multi-tenancy](../multi-tenancy.md) for more information.

## Pros

Very simple, no additional software required to use Loki when paired with the BoltDB index store.

Great for low volume applications, proof of concepts, and just playing around with Loki.

## Cons

### Scaling

At some point there is a limit to how many chunks can be stored in a single directory, for example see [this issue](https://github.com/grafana/loki/issues/1502) which explains how a Loki user ran into a strange error with about **5.5 million chunk files** in their file store (and also a workaround for the problem).

However, if you keep your streams low (remember loki writes a chunk per stream) and use configs like `chunk_target_size` (around 1MB), `max_chunk_age` (increase beyond 1h), `chunk_idle_period` (increase to match `max_chunk_age`) can be tweaked to reduce the number of chunks flushed (although they will trade for more memory consumption).

It's still very possible to store terabytes of log data with the filestore, but realize there are limitations to how many files a filesystem will want to store in a single directory.

### Durability

The durability of the objects is at the mercy of the filesystem itself where other object stores like S3/GCS do a lot behind the scenes to offer extremely high durability to your data.

### High Availability

Running Loki clustered is not possible with the filesystem store unless the filesystem is shared in some fashion (NFS for example). However using shared filesystems is likely going to be a bad experience with Loki just as it is for almost every other application.

## New AND VERY EXPERIMENTAL in 1.5.0: Horizontal scaling of the filesystem store

**WARNING** as the title suggests, this is very new and potentially buggy, and it is also very likely configs around this feature will change over time.

With that warning out of the way, the addition of the [boltdb-shipper](boltdb-shipper.md) index store has added capabilities making it possible to overcome many of the limitations listed above using the filesystem store, specifically running Loki with the filesystem store on separate machines but still operate as a cluster supporting replication, and write distribution via the hash ring.

As mentioned in the title, this is very alpha at this point but we would love for people to try this and help us flush out bugs.

Here is an example config to run with Loki:

Use this config on multiple computers (or containers), do not run it on the same computer as Loki uses the hostname as the ID in the ring.

Do not use a shared fileystem such as NFS for this, each machine should have its own filesystem

```yaml
auth_enabled: false # single tenant mode

server:
http_listen_port: 3100

ingester:
max_transfer_retries: 0 # Disable blocks transfers on ingesters shutdown or rollout.
chunk_idle_period: 2h # Let chunks sit idle for at least 2h before flushing, this helps to reduce total chunks in store
max_chunk_age: 2h # Let chunks get at least 2h old before flushing due to age, this helps to reduce total chunks in store
chunk_target_size: 1048576 # Target chunks of 1MB, this helps to reduce total chunks in store
chunk_retain_period: 30s

query_store_max_look_back_period: -1 # This will allow the ingesters to query the store for all data
lifecycler:
heartbeat_period: 5s
interface_names:
- eth0
join_after: 30s
num_tokens: 512
ring:
heartbeat_timeout: 1m
kvstore:
consul:
consistent_reads: true
host: localhost:8500
http_client_timeout: 20s
store: consul
replication_factor: 1 # This can be increased and probably should if you are running multiple machines!

schema_config:
configs:
- from: 2018-04-15
store: boltdb-shipper
object_store: filesystem
schema: v11
index:
prefix: index_
period: 168h

storage_config:
boltdb_shipper:
shared_store: filesystem
active_index_directory: /tmp/loki/index
cache_location: /tmp/loki/boltdb-cache
filesystem:
directory: /tmp/loki/chunks

limits_config:
enforce_metric_name: false
reject_old_samples: true
reject_old_samples_max_age: 168h

chunk_store_config:
max_look_back_period: 0s # No limit how far we can look back in the store

table_manager:
retention_deletes_enabled: false
retention_period: 0s # No deletions, infinite retention
```

It does require Consul to be running for the ring (any of the ring stores will work: consul, etcd, memberlist, Consul is used in this example)

It is also required that Consul be available from each machine, this example only specifies `host: localhost:8500` you would likely need to change this to the correct hostname/ip and port of your consul server.

**The config needs to be the same on every Loki instance!**

The important piece of this config is `query_store_max_look_back_period: -1` this tells Loki to allow the ingesters to look in the store for all the data.

Traffic can be sent to any of the Loki servers, it can be round-robin load balanced if desired.

Each Loki instance will use Consul to properly route both read and write data to the correct Loki instance.

Scaling up is as easy as adding more loki instances and letting them talk to the same ring.

Scaling down is harder but possible. You would need to shutdown a Loki server then take everything in:

```yaml
filesystem:
directory: /tmp/loki/chunks
```

And copy it to the same directory on another Loki server, there is currently no way to split the chunks between servers you must move them all. We expect to provide more options here in the future.


67 changes: 59 additions & 8 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/weaveworks/common/user"
"google.golang.org/grpc/health/grpc_health_v1"

Expand All @@ -25,6 +26,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
listutil "github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)

Expand Down Expand Up @@ -64,7 +66,7 @@ type Config struct {
ingesterClientFactory func(cfg client.Config, addr string) (client.HealthAndIngesterClient, error)

QueryStore bool `yaml:"-"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can maybe get rid of this because now we mostly rely on QueryStoreMaxLookBackPeriod to enable/disable querying the store from ingesters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems correct what @sandeepsukhani is saying ?

QueryStoreMaxLookBackPeriod time.Duration `yaml:"-"`
QueryStoreMaxLookBackPeriod time.Duration `yaml:"query_store_max_look_back_period"`
}

// RegisterFlags registers the flags.
Expand All @@ -84,6 +86,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.Float64Var(&cfg.SyncMinUtilization, "ingester.sync-min-utilization", 0, "Minimum utilization of chunk when doing synchronization.")
f.IntVar(&cfg.MaxReturnedErrors, "ingester.max-ignored-stream-errors", 10, "Maximum number of ignored stream errors to return. 0 to return all errors.")
f.DurationVar(&cfg.MaxChunkAge, "ingester.max-chunk-age", time.Hour, "Maximum chunk age before flushing.")
f.DurationVar(&cfg.QueryStoreMaxLookBackPeriod, "ingester.query-store-max-look-back-period", 0, "How far back should an ingester be allowed to query the store for data, for use only with boltdb-shipper index and filesystem object store. -1 for infinite.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the requirement should be boltdb(not boltdb_shipper) as index store and filesystem as object store because there is no use of having boltdb_shipper without shared object store otherwise you would be copying files somewhere on the same filesystem which is essentially same as not using boltdb_shipper at all. I might be wrong here or might be missing your point. What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If my above assumption is true, that the file names are unique, I specified boltdb_shipper so that you could copy the files between ingesters to allow scaling down, otherwise what you said is true that there isn't much reason to use boltdb_shipper for this rather than boltdb.

With the exception of one other reason I could think of, it would be easier to take backups or even move the Loki to another host because the index files can be directly copied with the chunks directory while Loki is still running, I don't think this is true for boltdb I don't think you can safely copy an open boltdb file.

}

// Ingester builds chunks for incoming log streams.
Expand Down Expand Up @@ -311,7 +314,50 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp
}

instance := i.getOrCreateInstance(instanceID)
return instance.Label(ctx, req)
resp, err := instance.Label(ctx, req)
if err != nil {
return nil, err
}

// Only continue if we should query the store for labels
if !i.cfg.QueryStore {
return resp, nil
}

// Only continue if the store is a chunk.Store
var cs chunk.Store
var ok bool
if cs, ok = i.store.(chunk.Store); !ok {
return resp, nil
}

userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}
// Adjust the start time based on QueryStoreMaxLookBackPeriod.
start := adjustQueryStartTime(i.cfg, *req.Start)
if start.After(*req.End) {
// The request is older than we are allowed to query the store, just return what we have.
return resp, nil
}
from, through := model.TimeFromUnixNano(start.UnixNano()), model.TimeFromUnixNano(req.End.UnixNano())
var storeValues []string
if req.Values {
storeValues, err = cs.LabelValuesForMetricName(ctx, userID, from, through, "logs", req.Name)
if err != nil {
return nil, err
}
} else {
storeValues, err = cs.LabelNamesForMetricName(ctx, userID, from, through, "logs")
if err != nil {
return nil, err
}
}

return &logproto.LabelResponse{
Values: listutil.MergeStringLists(resp.Values, storeValues),
}, nil
}

// Series queries the ingester for log stream identifiers (label sets) matching a set of matchers
Expand Down Expand Up @@ -414,12 +460,7 @@ func buildStoreRequest(cfg Config, req *logproto.QueryRequest) *logproto.QueryRe
}
start := req.Start
end := req.End
if cfg.QueryStoreMaxLookBackPeriod != 0 {
oldestStartTime := time.Now().Add(-cfg.QueryStoreMaxLookBackPeriod)
if oldestStartTime.After(req.Start) {
start = oldestStartTime
}
}
start = adjustQueryStartTime(cfg, start)

if start.After(end) {
return nil
Expand All @@ -431,3 +472,13 @@ func buildStoreRequest(cfg Config, req *logproto.QueryRequest) *logproto.QueryRe

return &newRequest
}

func adjustQueryStartTime(cfg Config, start time.Time) time.Time {
if cfg.QueryStoreMaxLookBackPeriod > 0 {
oldestStartTime := time.Now().Add(-cfg.QueryStoreMaxLookBackPeriod)
if oldestStartTime.After(start) {
start = oldestStartTime
}
}
return start
}
43 changes: 35 additions & 8 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package loki

import (
"errors"
"fmt"
"net/http"
"os"
Expand Down Expand Up @@ -163,7 +164,9 @@ func (t *Loki) initQuerier() (services.Service, error) {
if err != nil {
return nil, err
}

if t.cfg.Ingester.QueryStoreMaxLookBackPeriod != 0 {
t.cfg.Querier.IngesterQueryStoreMaxLookback = t.cfg.Ingester.QueryStoreMaxLookBackPeriod
}
t.querier, err = querier.New(t.cfg.Querier, t.cfg.IngesterClient, t.ring, t.store, t.overrides)
if err != nil {
return nil, err
Expand Down Expand Up @@ -198,10 +201,14 @@ func (t *Loki) initIngester() (_ services.Service, err error) {
t.cfg.Ingester.LifecyclerConfig.ListenPort = t.cfg.Server.GRPCListenPort

// We want ingester to also query the store when using boltdb-shipper
if activeIndexType(t.cfg.SchemaConfig) == local.BoltDBShipperType {
pc := activePeriodConfig(t.cfg.SchemaConfig)
if pc.IndexType == local.BoltDBShipperType {
t.cfg.Ingester.QueryStore = true
// When using shipper, limit max look back for query to MaxChunkAge + upload interval by shipper + 15 mins to query only data whose index is not pushed yet
t.cfg.Ingester.QueryStoreMaxLookBackPeriod = t.cfg.Ingester.MaxChunkAge + local.ShipperFileUploadInterval + (15 * time.Minute)
mlb, err := calculateMaxLookBack(pc, t.cfg.Ingester.QueryStoreMaxLookBackPeriod, t.cfg.Ingester.MaxChunkAge)
if err != nil {
return nil, err
}
t.cfg.Ingester.QueryStoreMaxLookBackPeriod = mlb
}

t.ingester, err = ingester.New(t.cfg.Ingester, t.cfg.IngesterClient, t.store, t.overrides)
Expand Down Expand Up @@ -256,7 +263,7 @@ func (t *Loki) initTableManager() (services.Service, error) {
}

func (t *Loki) initStore() (_ services.Service, err error) {
if activeIndexType(t.cfg.SchemaConfig) == local.BoltDBShipperType {
if activePeriodConfig(t.cfg.SchemaConfig).IndexType == local.BoltDBShipperType {
t.cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.cfg.Ingester.LifecyclerConfig.ID
switch t.cfg.Target {
case Ingester:
Expand Down Expand Up @@ -483,15 +490,35 @@ var modules = map[moduleName]module{
},
}

// activeIndexType type returns index type which would be applicable to logs that would be pushed starting now
// activePeriodConfig type returns index type which would be applicable to logs that would be pushed starting now
// Note: Another periodic config can be applicable in future which can change index type
func activeIndexType(cfg chunk.SchemaConfig) string {
func activePeriodConfig(cfg chunk.SchemaConfig) chunk.PeriodConfig {
now := model.Now()
i := sort.Search(len(cfg.Configs), func(i int) bool {
return cfg.Configs[i].From.Time > now
})
if i > 0 {
i--
}
return cfg.Configs[i].IndexType
return cfg.Configs[i]
}

func calculateMaxLookBack(pc chunk.PeriodConfig, maxLookBackConfig, maxChunkAge time.Duration) (time.Duration, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That deserve a test and it seems simple.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol, yeah, i thought i put in the description or somewhere I wanted to test this but hadn't done it yet

if pc.ObjectType != local.FilesystemObjectStoreType && maxLookBackConfig.Milliseconds() != 0 {
return 0, errors.New("it is an error to specify a non zero `query_store_max_look_back_period` value when using any object store other than `filesystem`")
}
// When using shipper, limit max look back for query to MaxChunkAge + upload interval by shipper + 15 mins to query only data whose index is not pushed yet
defaultMaxLookBack := maxChunkAge + local.ShipperFileUploadInterval + (15 * time.Minute)

if maxLookBackConfig == 0 {
// If the QueryStoreMaxLookBackPeriod is still it's default value of 0, set it to the default calculated value.
return defaultMaxLookBack, nil
} else if maxLookBackConfig > 0 && maxLookBackConfig < defaultMaxLookBack {
// If the QueryStoreMaxLookBackPeriod is > 0 (-1 is allowed for infinite), make sure it's at least greater than the default or throw an error
return 0, fmt.Errorf("the configured query_store_max_look_back_period of '%v' is less than the calculated default of '%v' "+
"which is calculated based on the max_chunk_age + 15 minute boltdb-shipper interval + 15 min additional buffer. Increase this value"+
"greater than the default or remove it from the configuration to use the default", maxLookBackConfig, defaultMaxLookBack)

}
return maxLookBackConfig, nil
}
6 changes: 3 additions & 3 deletions pkg/loki/modules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,20 @@ func TestActiveIndexType(t *testing.T) {
IndexType: "first",
}}

assert.Equal(t, "first", activeIndexType(cfg))
assert.Equal(t, cfg.Configs[0], activePeriodConfig(cfg))

// add a newer PeriodConfig in the past which should be considered
cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{
From: chunk.DayTime{Time: model.Now().Add(-12 * time.Hour)},
IndexType: "second",
})
assert.Equal(t, "second", activeIndexType(cfg))
assert.Equal(t, cfg.Configs[1], activePeriodConfig(cfg))

// add a newer PeriodConfig in the future which should not be considered
cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{
From: chunk.DayTime{Time: model.Now().Add(time.Hour)},
IndexType: "third",
})
assert.Equal(t, "second", activeIndexType(cfg))
assert.Equal(t, cfg.Configs[1], activePeriodConfig(cfg))

}
Loading