Skip to content

Commit

Permalink
Add backoff config to loki source file (#4927)
Browse files Browse the repository at this point in the history
* add backoff config to loki source file

* add loki source file backoff config in converter

* replace time.Now().Sub() by time.Since()

* rename backoff to file_watch

* Update docs/sources/flow/reference/components/loki.source.file.md

Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com>

* check for err in test

* fix changelog

* change sleep value in test to make it more robust

---------

Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com>
  • Loading branch information
wildum and clayton-cornell committed Sep 1, 2023
1 parent da42ad5 commit 61670d8
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 21 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ Main (unreleased)

- Clustering: Add a new flag `--cluster.name` to prevent nodes without this identifier from joining the cluster. (@wildum)

- Add a `file_watch` block in `loki.source.file` to configure how often to poll files from disk for changes via `min_poll_frequency` and `max_poll_frequency`.
In static mode it can be configured in the global `file_watch_config` via `min_poll_frequency` and `max_poll_frequency`. (@wildum)

### Enhancements

- Clustering: Allow advertise interfaces to be configurable. (@wildum)
Expand Down
24 changes: 24 additions & 0 deletions component/loki/source/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/grafana/agent/component/common/loki"
"github.com/grafana/agent/component/common/loki/positions"
"github.com/grafana/agent/component/discovery"
"github.com/grafana/tail/watch"
"github.com/prometheus/common/model"
)

Expand All @@ -40,6 +41,24 @@ type Arguments struct {
ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`
Encoding string `river:"encoding,attr,optional"`
DecompressionConfig DecompressionConfig `river:"decompression,block,optional"`
FileWatch FileWatch `river:"file_watch,block,optional"`
}

type FileWatch struct {
MinPollFrequency time.Duration `river:"min_poll_frequency,attr,optional"`
MaxPollFrequency time.Duration `river:"max_poll_frequency,attr,optional"`
}

var DefaultArguments = Arguments{
FileWatch: FileWatch{
MinPollFrequency: 250 * time.Millisecond,
MaxPollFrequency: 250 * time.Millisecond,
},
}

// SetToDefault implements river.Defaulter.
func (a *Arguments) SetToDefault() {
*a = DefaultArguments
}

type DecompressionConfig struct {
Expand Down Expand Up @@ -316,6 +335,10 @@ func (c *Component) startTailing(path string, labels model.LabelSet, handler lok
reader = decompressor
} else {
level.Debug(c.opts.Logger).Log("msg", "tailing new file", "filename", path)
pollOptions := watch.PollingFileWatcherOptions{
MinPollFrequency: c.args.FileWatch.MinPollFrequency,
MaxPollFrequency: c.args.FileWatch.MaxPollFrequency,
}
tailer, err := newTailer(
c.metrics,
c.opts.Logger,
Expand All @@ -324,6 +347,7 @@ func (c *Component) startTailing(path string, labels model.LabelSet, handler lok
path,
labels.String(),
c.args.Encoding,
pollOptions,
)
if err != nil {
level.Error(c.opts.Logger).Log("msg", "failed to start tailer", "error", err, "filename", path)
Expand Down
58 changes: 58 additions & 0 deletions component/loki/source/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,64 @@ func Test(t *testing.T) {
}
}

func TestFileWatch(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))
ctx, cancel := context.WithCancel(componenttest.TestContext(t))

// Create file to log to.
f, err := os.CreateTemp(t.TempDir(), "example")
require.NoError(t, err)
defer f.Close()

ctrl, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.source.file")
require.NoError(t, err)

ch1 := loki.NewLogsReceiver()

args := Arguments{
Targets: []discovery.Target{{
"__path__": f.Name(),
"foo": "bar",
}},
ForwardTo: []loki.LogsReceiver{ch1},
FileWatch: FileWatch{
MinPollFrequency: time.Millisecond * 500,
MaxPollFrequency: time.Millisecond * 500,
},
}

go func() {
err := ctrl.Run(ctx, args)
require.NoError(t, err)
}()

err = ctrl.WaitRunning(time.Minute)
require.NoError(t, err)

timeBeforeWriting := time.Now()

// Sleep for 600ms to miss the first poll, the next poll should be MaxPollFrequency later.
time.Sleep(time.Millisecond * 600)

_, err = f.Write([]byte("writing some text\n"))
require.NoError(t, err)

select {
case logEntry := <-ch1.Chan():
require.Greater(t, time.Since(timeBeforeWriting), 1*time.Second)
require.WithinDuration(t, time.Now(), timeBeforeWriting, 2*time.Second)
require.Equal(t, "writing some text", logEntry.Line)
case <-time.After(5 * time.Second):
require.FailNow(t, "failed waiting for log line")
}

// Shut down the component.
cancel()

// Wait to make sure that all go routines stopped.
time.Sleep(args.FileWatch.MaxPollFrequency)
}

// Test that updating the component does not leak goroutines.
func TestUpdate_NoLeak(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))
Expand Down
6 changes: 4 additions & 2 deletions component/loki/source/file/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/tail"
"github.com/grafana/tail/watch"
"github.com/prometheus/common/model"
"go.uber.org/atomic"
"golang.org/x/text/encoding"
Expand Down Expand Up @@ -44,7 +45,7 @@ type tailer struct {
decoder *encoding.Decoder
}

func newTailer(metrics *metrics, logger log.Logger, handler loki.EntryHandler, positions positions.Positions, path string, labels string, encoding string) (*tailer, error) {
func newTailer(metrics *metrics, logger log.Logger, handler loki.EntryHandler, positions positions.Positions, path string, labels string, encoding string, pollOptions watch.PollingFileWatcherOptions) (*tailer, error) {
// Simple check to make sure the file we are tailing doesn't
// have a position already saved which is past the end of the file.
fi, err := os.Stat(path)
Expand All @@ -69,7 +70,8 @@ func newTailer(metrics *metrics, logger log.Logger, handler loki.EntryHandler, p
Offset: pos,
Whence: 0,
},
Logger: util.NewLogAdapter(logger),
Logger: util.NewLogAdapter(logger),
PollOptions: pollOptions,
})
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/grafana/agent/converter/internal/common"
"github.com/grafana/agent/converter/internal/prometheusconvert"
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
"github.com/grafana/loki/clients/pkg/promtail/targets/file"
"github.com/grafana/river/scanner"
"github.com/grafana/river/token/builder"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -81,7 +82,7 @@ func (s *ScrapeConfigBuilder) Sanitize() {
}
}

func (s *ScrapeConfigBuilder) AppendLokiSourceFile() {
func (s *ScrapeConfigBuilder) AppendLokiSourceFile(watchConfig *file.WatchConfig) {
// If there were no targets expressions collected, that means
// we didn't have any components that produced SD targets, so
// we can skip this component.
Expand All @@ -95,6 +96,7 @@ func (s *ScrapeConfigBuilder) AppendLokiSourceFile() {
ForwardTo: forwardTo,
Encoding: s.cfg.Encoding,
DecompressionConfig: convertDecompressionConfig(s.cfg.DecompressionCfg),
FileWatch: convertFileWatchConfig(watchConfig),
}
overrideHook := func(val interface{}) interface{} {
if _, ok := val.([]discovery.Target); ok {
Expand Down Expand Up @@ -258,6 +260,16 @@ func convertDecompressionConfig(cfg *scrapeconfig.DecompressionConfig) lokisourc
}
}

func convertFileWatchConfig(watchConfig *file.WatchConfig) lokisourcefile.FileWatch {
if watchConfig == nil {
return lokisourcefile.FileWatch{}
}
return lokisourcefile.FileWatch{
MinPollFrequency: watchConfig.MinPollFrequency,
MaxPollFrequency: watchConfig.MaxPollFrequency,
}
}

func logsReceiversToExpr(r []loki.LogsReceiver) string {
var exprs []string
for _, r := range r {
Expand Down
6 changes: 4 additions & 2 deletions converter/internal/promtailconvert/promtailconvert.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/grafana/loki/clients/pkg/promtail/limit"
"github.com/grafana/loki/clients/pkg/promtail/positions"
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
"github.com/grafana/loki/clients/pkg/promtail/targets/file"
lokicfgutil "github.com/grafana/loki/pkg/util/cfg"
"github.com/grafana/river/token/builder"
"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -99,7 +100,7 @@ func AppendAll(f *builder.File, cfg *promtailcfg.Config, labelPrefix string, dia
}

for _, sc := range cfg.ScrapeConfig {
appendScrapeConfig(f, &sc, &diags, gc)
appendScrapeConfig(f, &sc, &diags, gc, &cfg.Global.FileWatch)
}

for _, write := range writeBlocks {
Expand Down Expand Up @@ -127,6 +128,7 @@ func appendScrapeConfig(
cfg *scrapeconfig.Config,
diags *diag.Diagnostics,
gctx *build.GlobalContext,
watchConfig *file.WatchConfig,
) {

b := build.NewScrapeConfigBuilder(f, diags, cfg, gctx)
Expand All @@ -149,7 +151,7 @@ func appendScrapeConfig(
// If any relabelling is required, it will be done via a discovery.relabel component.
// The files will be watched and the globs in file paths will be expanded using discovery.file component.
// The log entries are sent to loki.process if processing is needed, or directly to loki.write components.
b.AppendLokiSourceFile()
b.AppendLokiSourceFile(watchConfig)

// Append all the components that produce logs directly.
// If any relabelling is required, it will be done via a loki.relabel component.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
(Error) global/file_watch_config is not supported
(Error) global positions configuration is not supported - each Flow Mode's loki.source.file component has its own positions file in the component's data directory
(Warning) stream_lag_labels is deprecated and the associated metric has been removed
(Error) Promtail's WAL is currently not supported in Flow Mode
Expand Down
7 changes: 0 additions & 7 deletions converter/internal/promtailconvert/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,11 @@ package promtailconvert
import (
"github.com/grafana/agent/converter/diag"
promtailcfg "github.com/grafana/loki/clients/pkg/promtail/config"
"github.com/grafana/loki/clients/pkg/promtail/targets/file"
)

// validateTopLevelConfig validates the top-level config for any unsupported features. There may still be some
// other unsupported features in scope of each config block, which are raised by their respective conversion code.
func validateTopLevelConfig(cfg *promtailcfg.Config, diags *diag.Diagnostics) {
// We currently do not support the new global file watch config. It's an error, since setting it indicates
// some advanced tuning which the user likely needs.
if cfg.Global.FileWatch != file.DefaultWatchConig {
diags.Add(diag.SeverityLevelError, "global/file_watch_config is not supported")
}

// The positions global config is not supported in Flow Mode.
if cfg.PositionsConfig != DefaultPositionsConfig() {
diags.Add(
Expand Down
10 changes: 10 additions & 0 deletions converter/internal/staticconvert/testdata/promtail_prom.river
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ local.file_match "logs_name_jobName" {
loki.source.file "logs_name_jobName" {
targets = local.file_match.logs_name_jobName.targets
forward_to = [loki.write.logs_name.receiver]

file_watch {
min_poll_frequency = "1s"
max_poll_frequency = "5s"
}
}

loki.write "logs_name" {
Expand Down Expand Up @@ -118,6 +123,11 @@ local.file_match "logs_name2_jobName" {
loki.source.file "logs_name2_jobName" {
targets = local.file_match.logs_name2_jobName.targets
forward_to = [loki.write.logs_name2.receiver]

file_watch {
min_poll_frequency = "1s"
max_poll_frequency = "5s"
}
}

loki.write "logs_name2" {
Expand Down
4 changes: 4 additions & 0 deletions converter/internal/staticconvert/testdata/promtail_prom.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ metrics:
services: ['myapp']

logs:
global:
file_watch_config:
min_poll_frequency: 1s
max_poll_frequency: 5s
positions_directory: /path
configs:
- name: name
Expand Down
32 changes: 24 additions & 8 deletions docs/sources/flow/reference/components/loki.source.file.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ log entries to the list of receivers passed in `forward_to`.

`loki.source.file` supports the following arguments:

Name | Type | Description | Default | Required
--------------|----------------------|--------------------------------------------------|---------|----------
`targets` | `list(map(string))` | List of files to read from. | | yes
`forward_to` | `list(LogsReceiver)` | List of receivers to send log entries to. | | yes
`encoding` | `string` | The encoding to convert from when reading files. | `""` | no
Name | Type | Description | Default | Required
--------------|----------------------|------------------------------------------------------------|---------|----------
`targets` | `list(map(string))` | List of files to read from. | | yes
`forward_to` | `list(LogsReceiver)` | List of receivers to send log entries to. | | yes
`encoding` | `string` | The encoding to convert from when reading files. | `""` | no

The `encoding` argument must be a valid [IANA encoding][] name. If not set, it
defaults to UTF-8.
Expand All @@ -43,11 +43,13 @@ defaults to UTF-8.

The following blocks are supported inside the definition of `loki.source.file`:

Hierarchy | Name | Description | Required
----------------|--------------------|-----------------------------------------------|----------
decompresssion | [decompresssion][] | Configure reading logs from compressed files. | no
Hierarchy | Name | Description | Required
----------------|--------------------|-------------------------------------------------------------------|----------
decompresssion | [decompresssion][] | Configure reading logs from compressed files. | no
file_watch | [file_watch][] | Configure how often files should be polled from disk for changes. | no

[decompresssion]: #decompresssion-block
[file_watch]: #file_watch-block

### decompresssion block

Expand All @@ -72,6 +74,20 @@ Currently supported compression formats are:
The component can only support one compression format at a time, in order to
handle multiple formats, you will need to create multiple components.

### file_watch block

The `file_watch` block configures how often log files are polled from disk for changes.
The following arguments are supported:

Name | Type | Description | Default | Required
----------------------|------------|-------------------------------------------|---------|----------
`min_poll_frequency` | `duration` | Minimum frequency to poll for files. | 250ms | no
`max_poll_frequency` | `duration` | Maximum frequency to poll for files. | 250ms | no

If no file changes are detected, the poll frequency doubles until a file change is detected or the poll frequency reaches the `max_poll_frequency`.

If file changes are detected, the poll frequency is reset to `min_poll_frequency`.

## Exported fields

`loki.source.file` does not export any fields.
Expand Down

0 comments on commit 61670d8

Please sign in to comment.