Skip to content

Commit

Permalink
Loki: Allow configuring query_store_max_look_back_period when running…
Browse files Browse the repository at this point in the history
… a filesystem store and boltdb-shipper (#2073)

* Allow configuring query_store_max_look_back_period when running a filesystem store, which allows for boltdb-shipper to horizontally scale filesystem store.

Signed-off-by: Ed Welch <edward.welch@grafana.com>

* Update docs/operations/storage/boltdb-shipper.md

Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* add some more precision to the timestamp used to make the name unique just to make sure there are not collisions if for some reason the ID is empty (which can happen if the hostname doesn't resolve for some reason)

Signed-off-by: Ed Welch <edward.welch@grafana.com>

* Allow the ingester to query the store for labels if query store is enabled.

Signed-off-by: Ed Welch <edward.welch@grafana.com>

* make queriers aware of the fact that ingesters may be querying the store so they don't duplicate query efforts

Signed-off-by: Ed Welch <edward.welch@grafana.com>

* improve the docs

Signed-off-by: Ed Welch <edward.welch@grafana.com>

* add tests to the calculateMaxLookBack

Signed-off-by: Ed Welch <edward.welch@grafana.com>

Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
  • Loading branch information
slim-bean and sandeepsukhani authored May 14, 2020
1 parent 7dd1097 commit 9803eab
Show file tree
Hide file tree
Showing 10 changed files with 410 additions and 74 deletions.
7 changes: 7 additions & 0 deletions docs/configuration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,13 @@ The `ingester_config` block configures Ingesters.
# The maximum duration of a timeseries chunk in memory. If a timeseries runs for longer than this the current chunk will be flushed to the store and a new chunk created.
[max_chunk_age: <duration> | default = 1h]

# How far in the past an ingester is allowed to query the store for data.
# This is only useful for running multiple loki binaries with a shared ring with a `filesystem` store which is NOT shared between the binaries
# When using any "shared" object store like S3 or GCS this value must always be left as 0
# It is an error to configure this to a non-zero value when using any object store other than `filesystem`
# Use a value of -1 to allow the ingester to query the store infinitely far back in time.
[query_store_max_look_back_period: <duration> | default = 0]

```
### lifecycler_config
Expand Down
3 changes: 2 additions & 1 deletion docs/operations/storage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The following are supported for the index:
* [Google Bigtable](https://cloud.google.com/bigtable)
* [Apache Cassandra](https://cassandra.apache.org)
* [BoltDB](https://github.com/boltdb/bolt) (doesn't work when clustering Loki)
* [Boltb-Shipper](boltdb-shipper.md) EXPERIMENTAL index store which stores boltdb index files in the object store

The following are supported for the chunks:

Expand All @@ -34,7 +35,7 @@ The following are supported for the chunks:
* [Apache Cassandra](https://cassandra.apache.org)
* [Amazon S3](https://aws.amazon.com/s3)
* [Google Cloud Storage](https://cloud.google.com/storage/)
* Filesystem (doesn't work when clustering Loki)
* [Filesystem](filesystem.md) (please read more about the filesystem to understand the pros/cons before using with production data)

## Cloud Storage Permissions

Expand Down
141 changes: 141 additions & 0 deletions docs/operations/storage/filesystem.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Filesystem Object Store

The filesystem object store is the easiest to get started with Loki but there are some pros/cons to this approach.

Very simply it stores all the objects (chunks) in the specified directory:

```yaml
storage_config:
filesystem:
directory: /tmp/loki/
```
A folder is created for every tenant all the chunks for one tenant are stored in that directory.
If loki is run in single-tenant mode, all the chunks are put in a folder named `fake` which is the synthesized tenant name used for single tenant mode.

See [multi-tenancy](../multi-tenancy.md) for more information.

## Pros

Very simple, no additional software required to use Loki when paired with the BoltDB index store.

Great for low volume applications, proof of concepts, and just playing around with Loki.

## Cons

### Scaling

At some point there is a limit to how many chunks can be stored in a single directory, for example see [this issue](https://github.com/grafana/loki/issues/1502) which explains how a Loki user ran into a strange error with about **5.5 million chunk files** in their file store (and also a workaround for the problem).

However, if you keep your streams low (remember loki writes a chunk per stream) and use configs like `chunk_target_size` (around 1MB), `max_chunk_age` (increase beyond 1h), `chunk_idle_period` (increase to match `max_chunk_age`) can be tweaked to reduce the number of chunks flushed (although they will trade for more memory consumption).

It's still very possible to store terabytes of log data with the filestore, but realize there are limitations to how many files a filesystem will want to store in a single directory.

### Durability

The durability of the objects is at the mercy of the filesystem itself where other object stores like S3/GCS do a lot behind the scenes to offer extremely high durability to your data.

### High Availability

Running Loki clustered is not possible with the filesystem store unless the filesystem is shared in some fashion (NFS for example). However using shared filesystems is likely going to be a bad experience with Loki just as it is for almost every other application.

## New AND VERY EXPERIMENTAL in 1.5.0: Horizontal scaling of the filesystem store

**WARNING** as the title suggests, this is very new and potentially buggy, and it is also very likely configs around this feature will change over time.

With that warning out of the way, the addition of the [boltdb-shipper](boltdb-shipper.md) index store has added capabilities making it possible to overcome many of the limitations listed above using the filesystem store, specifically running Loki with the filesystem store on separate machines but still operate as a cluster supporting replication, and write distribution via the hash ring.

As mentioned in the title, this is very alpha at this point but we would love for people to try this and help us flush out bugs.

Here is an example config to run with Loki:

Use this config on multiple computers (or containers), do not run it on the same computer as Loki uses the hostname as the ID in the ring.

Do not use a shared fileystem such as NFS for this, each machine should have its own filesystem

```yaml
auth_enabled: false # single tenant mode
server:
http_listen_port: 3100
ingester:
max_transfer_retries: 0 # Disable blocks transfers on ingesters shutdown or rollout.
chunk_idle_period: 2h # Let chunks sit idle for at least 2h before flushing, this helps to reduce total chunks in store
max_chunk_age: 2h # Let chunks get at least 2h old before flushing due to age, this helps to reduce total chunks in store
chunk_target_size: 1048576 # Target chunks of 1MB, this helps to reduce total chunks in store
chunk_retain_period: 30s
query_store_max_look_back_period: -1 # This will allow the ingesters to query the store for all data
lifecycler:
heartbeat_period: 5s
interface_names:
- eth0
join_after: 30s
num_tokens: 512
ring:
heartbeat_timeout: 1m
kvstore:
consul:
consistent_reads: true
host: localhost:8500
http_client_timeout: 20s
store: consul
replication_factor: 1 # This can be increased and probably should if you are running multiple machines!
schema_config:
configs:
- from: 2018-04-15
store: boltdb-shipper
object_store: filesystem
schema: v11
index:
prefix: index_
period: 168h
storage_config:
boltdb_shipper:
shared_store: filesystem
active_index_directory: /tmp/loki/index
cache_location: /tmp/loki/boltdb-cache
filesystem:
directory: /tmp/loki/chunks
limits_config:
enforce_metric_name: false
reject_old_samples: true
reject_old_samples_max_age: 168h
chunk_store_config:
max_look_back_period: 0s # No limit how far we can look back in the store
table_manager:
retention_deletes_enabled: false
retention_period: 0s # No deletions, infinite retention
```

It does require Consul to be running for the ring (any of the ring stores will work: consul, etcd, memberlist, Consul is used in this example)

It is also required that Consul be available from each machine, this example only specifies `host: localhost:8500` you would likely need to change this to the correct hostname/ip and port of your consul server.

**The config needs to be the same on every Loki instance!**

The important piece of this config is `query_store_max_look_back_period: -1` this tells Loki to allow the ingesters to look in the store for all the data.

Traffic can be sent to any of the Loki servers, it can be round-robin load balanced if desired.

Each Loki instance will use Consul to properly route both read and write data to the correct Loki instance.

Scaling up is as easy as adding more loki instances and letting them talk to the same ring.

Scaling down is harder but possible. You would need to shutdown a Loki server then take everything in:

```yaml
filesystem:
directory: /tmp/loki/chunks
```

And copy it to the same directory on another Loki server, there is currently no way to split the chunks between servers you must move them all. We expect to provide more options here in the future.


67 changes: 59 additions & 8 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/weaveworks/common/user"
"google.golang.org/grpc/health/grpc_health_v1"

Expand All @@ -25,6 +26,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
listutil "github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)

Expand Down Expand Up @@ -64,7 +66,7 @@ type Config struct {
ingesterClientFactory func(cfg client.Config, addr string) (client.HealthAndIngesterClient, error)

QueryStore bool `yaml:"-"`
QueryStoreMaxLookBackPeriod time.Duration `yaml:"-"`
QueryStoreMaxLookBackPeriod time.Duration `yaml:"query_store_max_look_back_period"`
}

// RegisterFlags registers the flags.
Expand All @@ -84,6 +86,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.Float64Var(&cfg.SyncMinUtilization, "ingester.sync-min-utilization", 0, "Minimum utilization of chunk when doing synchronization.")
f.IntVar(&cfg.MaxReturnedErrors, "ingester.max-ignored-stream-errors", 10, "Maximum number of ignored stream errors to return. 0 to return all errors.")
f.DurationVar(&cfg.MaxChunkAge, "ingester.max-chunk-age", time.Hour, "Maximum chunk age before flushing.")
f.DurationVar(&cfg.QueryStoreMaxLookBackPeriod, "ingester.query-store-max-look-back-period", 0, "How far back should an ingester be allowed to query the store for data, for use only with boltdb-shipper index and filesystem object store. -1 for infinite.")
}

// Ingester builds chunks for incoming log streams.
Expand Down Expand Up @@ -311,7 +314,50 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp
}

instance := i.getOrCreateInstance(instanceID)
return instance.Label(ctx, req)
resp, err := instance.Label(ctx, req)
if err != nil {
return nil, err
}

// Only continue if we should query the store for labels
if !i.cfg.QueryStore {
return resp, nil
}

// Only continue if the store is a chunk.Store
var cs chunk.Store
var ok bool
if cs, ok = i.store.(chunk.Store); !ok {
return resp, nil
}

userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}
// Adjust the start time based on QueryStoreMaxLookBackPeriod.
start := adjustQueryStartTime(i.cfg, *req.Start)
if start.After(*req.End) {
// The request is older than we are allowed to query the store, just return what we have.
return resp, nil
}
from, through := model.TimeFromUnixNano(start.UnixNano()), model.TimeFromUnixNano(req.End.UnixNano())
var storeValues []string
if req.Values {
storeValues, err = cs.LabelValuesForMetricName(ctx, userID, from, through, "logs", req.Name)
if err != nil {
return nil, err
}
} else {
storeValues, err = cs.LabelNamesForMetricName(ctx, userID, from, through, "logs")
if err != nil {
return nil, err
}
}

return &logproto.LabelResponse{
Values: listutil.MergeStringLists(resp.Values, storeValues),
}, nil
}

// Series queries the ingester for log stream identifiers (label sets) matching a set of matchers
Expand Down Expand Up @@ -414,12 +460,7 @@ func buildStoreRequest(cfg Config, req *logproto.QueryRequest) *logproto.QueryRe
}
start := req.Start
end := req.End
if cfg.QueryStoreMaxLookBackPeriod != 0 {
oldestStartTime := time.Now().Add(-cfg.QueryStoreMaxLookBackPeriod)
if oldestStartTime.After(req.Start) {
start = oldestStartTime
}
}
start = adjustQueryStartTime(cfg, start)

if start.After(end) {
return nil
Expand All @@ -431,3 +472,13 @@ func buildStoreRequest(cfg Config, req *logproto.QueryRequest) *logproto.QueryRe

return &newRequest
}

func adjustQueryStartTime(cfg Config, start time.Time) time.Time {
if cfg.QueryStoreMaxLookBackPeriod > 0 {
oldestStartTime := time.Now().Add(-cfg.QueryStoreMaxLookBackPeriod)
if oldestStartTime.After(start) {
start = oldestStartTime
}
}
return start
}
43 changes: 35 additions & 8 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package loki

import (
"errors"
"fmt"
"net/http"
"os"
Expand Down Expand Up @@ -163,7 +164,9 @@ func (t *Loki) initQuerier() (services.Service, error) {
if err != nil {
return nil, err
}

if t.cfg.Ingester.QueryStoreMaxLookBackPeriod != 0 {
t.cfg.Querier.IngesterQueryStoreMaxLookback = t.cfg.Ingester.QueryStoreMaxLookBackPeriod
}
t.querier, err = querier.New(t.cfg.Querier, t.cfg.IngesterClient, t.ring, t.store, t.overrides)
if err != nil {
return nil, err
Expand Down Expand Up @@ -198,10 +201,14 @@ func (t *Loki) initIngester() (_ services.Service, err error) {
t.cfg.Ingester.LifecyclerConfig.ListenPort = t.cfg.Server.GRPCListenPort

// We want ingester to also query the store when using boltdb-shipper
if activeIndexType(t.cfg.SchemaConfig) == local.BoltDBShipperType {
pc := activePeriodConfig(t.cfg.SchemaConfig)
if pc.IndexType == local.BoltDBShipperType {
t.cfg.Ingester.QueryStore = true
// When using shipper, limit max look back for query to MaxChunkAge + upload interval by shipper + 15 mins to query only data whose index is not pushed yet
t.cfg.Ingester.QueryStoreMaxLookBackPeriod = t.cfg.Ingester.MaxChunkAge + local.ShipperFileUploadInterval + (15 * time.Minute)
mlb, err := calculateMaxLookBack(pc, t.cfg.Ingester.QueryStoreMaxLookBackPeriod, t.cfg.Ingester.MaxChunkAge)
if err != nil {
return nil, err
}
t.cfg.Ingester.QueryStoreMaxLookBackPeriod = mlb
}

t.ingester, err = ingester.New(t.cfg.Ingester, t.cfg.IngesterClient, t.store, t.overrides)
Expand Down Expand Up @@ -256,7 +263,7 @@ func (t *Loki) initTableManager() (services.Service, error) {
}

func (t *Loki) initStore() (_ services.Service, err error) {
if activeIndexType(t.cfg.SchemaConfig) == local.BoltDBShipperType {
if activePeriodConfig(t.cfg.SchemaConfig).IndexType == local.BoltDBShipperType {
t.cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.cfg.Ingester.LifecyclerConfig.ID
switch t.cfg.Target {
case Ingester:
Expand Down Expand Up @@ -483,15 +490,35 @@ var modules = map[moduleName]module{
},
}

// activeIndexType type returns index type which would be applicable to logs that would be pushed starting now
// activePeriodConfig type returns index type which would be applicable to logs that would be pushed starting now
// Note: Another periodic config can be applicable in future which can change index type
func activeIndexType(cfg chunk.SchemaConfig) string {
func activePeriodConfig(cfg chunk.SchemaConfig) chunk.PeriodConfig {
now := model.Now()
i := sort.Search(len(cfg.Configs), func(i int) bool {
return cfg.Configs[i].From.Time > now
})
if i > 0 {
i--
}
return cfg.Configs[i].IndexType
return cfg.Configs[i]
}

func calculateMaxLookBack(pc chunk.PeriodConfig, maxLookBackConfig, maxChunkAge time.Duration) (time.Duration, error) {
if pc.ObjectType != local.FilesystemObjectStoreType && maxLookBackConfig.Nanoseconds() != 0 {
return 0, errors.New("it is an error to specify a non zero `query_store_max_look_back_period` value when using any object store other than `filesystem`")
}
// When using shipper, limit max look back for query to MaxChunkAge + upload interval by shipper + 15 mins to query only data whose index is not pushed yet
defaultMaxLookBack := maxChunkAge + local.ShipperFileUploadInterval + (15 * time.Minute)

if maxLookBackConfig == 0 {
// If the QueryStoreMaxLookBackPeriod is still it's default value of 0, set it to the default calculated value.
return defaultMaxLookBack, nil
} else if maxLookBackConfig > 0 && maxLookBackConfig < defaultMaxLookBack {
// If the QueryStoreMaxLookBackPeriod is > 0 (-1 is allowed for infinite), make sure it's at least greater than the default or throw an error
return 0, fmt.Errorf("the configured query_store_max_look_back_period of '%v' is less than the calculated default of '%v' "+
"which is calculated based on the max_chunk_age + 15 minute boltdb-shipper interval + 15 min additional buffer. Increase this value"+
"greater than the default or remove it from the configuration to use the default", maxLookBackConfig, defaultMaxLookBack)

}
return maxLookBackConfig, nil
}
Loading

0 comments on commit 9803eab

Please sign in to comment.