Skip to content

Commit

Permalink
Implement prospector reloading (#3362)
Browse files Browse the repository at this point in the history
This PR allows to dynamically reload prospectors. It works the same way as module reloading in metricbeat.

**Refactoring**

* LoadStates was separated from NewProspector. The reason is that after New only the ID is needed and setting up states requires more calculations. So this can be done in a second step when all the validations are done.
* Only allow to start a prospector when all states are set to Finished. If not, LoadStates returns an error. This is to prevent a prospector starting before a harvester finished with a file. The prospector will be picked up again during the next reloading phase.
* Extract ReloadConfig to libbeat

**Limitations**

This implementation currently has the some limitations. This are not new in filebeat but require more care as configurations change more often.

* Two prospectors on one file: It is possible, that two prospectors pick up one file because they defined overlapping patterns. This can have the consequence that two harvesters on the same file are running which can lead to duplicates and unpredictable behaviour. The risk is minimized in that a prospector does not start as long as a state it takes care of is not finished. But it can still happen that a Finished state is picked up but it also managed by an other prospector. The user must ensure no prospector paths overlap. This problem can potentially be solved in the future with a global harvester registry.

**Notes**

* In a later PR, more refactoring and unification of the reloading should happen.
  • Loading branch information
ruflin authored and Steffen Siering committed Jan 23, 2017
1 parent 4f41540 commit 240bbd2
Show file tree
Hide file tree
Showing 21 changed files with 513 additions and 83 deletions.
8 changes: 6 additions & 2 deletions filebeat/beater/filebeat.go
Expand Up @@ -59,10 +59,14 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
// Add prospectors created by the modules
config.Prospectors = append(config.Prospectors, moduleProspectors...)

if len(config.Prospectors) == 0 {
if !config.ProspectorReload.Enabled() && len(config.Prospectors) == 0 {
return nil, errors.New("No prospectors defined. What files do you want me to watch?")
}

if *once && config.ProspectorReload.Enabled() {
return nil, errors.New("prospector reloading and -once cannot be used together.")
}

fb := &Filebeat{
done: make(chan struct{}),
config: &config,
Expand Down Expand Up @@ -170,7 +174,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
spooler.Stop()
}()

err = crawler.Start(registrar)
err = crawler.Start(registrar, config.ProspectorReload)
if err != nil {
return err
}
Expand Down
17 changes: 9 additions & 8 deletions filebeat/config/config.go
Expand Up @@ -18,14 +18,15 @@ const (
)

type Config struct {
Prospectors []*common.Config `config:"prospectors"`
SpoolSize uint64 `config:"spool_size" validate:"min=1"`
PublishAsync bool `config:"publish_async"`
IdleTimeout time.Duration `config:"idle_timeout" validate:"nonzero,min=0s"`
RegistryFile string `config:"registry_file"`
ConfigDir string `config:"config_dir"`
ShutdownTimeout time.Duration `config:"shutdown_timeout"`
Modules []*common.Config `config:"modules"`
Prospectors []*common.Config `config:"prospectors"`
SpoolSize uint64 `config:"spool_size" validate:"min=1"`
PublishAsync bool `config:"publish_async"`
IdleTimeout time.Duration `config:"idle_timeout" validate:"nonzero,min=0s"`
RegistryFile string `config:"registry_file"`
ConfigDir string `config:"config_dir"`
ShutdownTimeout time.Duration `config:"shutdown_timeout"`
Modules []*common.Config `config:"modules"`
ProspectorReload *common.Config `config:"reload.prospectors"`
}

var (
Expand Down
56 changes: 38 additions & 18 deletions filebeat/crawler/crawler.go
Expand Up @@ -16,15 +16,12 @@ type Crawler struct {
prospectorConfigs []*common.Config
out prospector.Outlet
wg sync.WaitGroup
reloader *prospector.ProspectorReloader
once bool
}

func New(out prospector.Outlet, prospectorConfigs []*common.Config, once bool) (*Crawler, error) {

if len(prospectorConfigs) == 0 {
return nil, fmt.Errorf("No prospectors defined. You must have at least one prospector defined in the config file.")
}

return &Crawler{
out: out,
prospectors: map[uint64]*prospector.Prospector{},
Expand All @@ -33,7 +30,7 @@ func New(out prospector.Outlet, prospectorConfigs []*common.Config, once bool) (
}, nil
}

func (c *Crawler) Start(r *registrar.Registrar) error {
func (c *Crawler) Start(r *registrar.Registrar, reloaderConfig *common.Config) error {

logp.Info("Loading Prospectors: %v", len(c.prospectorConfigs))

Expand All @@ -45,6 +42,15 @@ func (c *Crawler) Start(r *registrar.Registrar) error {
}
}

if reloaderConfig.Enabled() {
logp.Warn("EXPERIMENTAL feature dynamic configuration reloading is enabled.")

c.reloader = prospector.NewProspectorReloader(reloaderConfig, c.out, r)
go func() {
c.reloader.Run()
}()
}

logp.Info("Loading and starting Prospectors completed. Enabled prospectors: %v", len(c.prospectors))

return nil
Expand All @@ -54,44 +60,58 @@ func (c *Crawler) startProspector(config *common.Config, states []file.State) er
if !config.Enabled() {
return nil
}
prospector, err := prospector.NewProspector(config, states, c.out)
p, err := prospector.NewProspector(config, c.out)
if err != nil {
return fmt.Errorf("Error in initing prospector: %s", err)
}
prospector.Once = c.once
p.Once = c.once

if _, ok := c.prospectors[p.ID]; ok {
return fmt.Errorf("Prospector with same ID already exists: %v", p.ID)
}

if _, ok := c.prospectors[prospector.ID]; ok {
return fmt.Errorf("Prospector with same ID already exists: %v", prospector.ID)
err = p.LoadStates(states)
if err != nil {
return fmt.Errorf("error loading states for propsector %v: %v", p.ID, err)
}

c.prospectors[prospector.ID] = prospector
c.prospectors[p.ID] = p
c.wg.Add(1)

go func() {
logp.Debug("crawler", "Starting prospector: %v", prospector.ID)
defer logp.Debug("crawler", "Prospector stopped: %v", prospector.ID)
logp.Debug("crawler", "Starting prospector: %v", p.ID)
defer logp.Debug("crawler", "Prospector stopped: %v", p.ID)

defer c.wg.Done()
prospector.Run()
p.Run()
}()

return nil
}

func (c *Crawler) Stop() {
logp.Info("Stopping Crawler")
stopProspector := func(p *prospector.Prospector) {
defer c.wg.Done()
p.Stop()

asyncWaitStop := func(stop func()) {
c.wg.Add(1)
go func() {
defer c.wg.Done()
stop()
}()
}

logp.Info("Stopping %v prospectors", len(c.prospectors))
for _, p := range c.prospectors {
// Stop prospectors in parallel
c.wg.Add(1)
go stopProspector(p)
asyncWaitStop(p.Stop)
}

if c.reloader != nil {
asyncWaitStop(c.reloader.Stop)
}

c.WaitForCompletion()

logp.Info("Crawler stopped")
}

Expand Down
18 changes: 0 additions & 18 deletions filebeat/crawler/crawler_test.go

This file was deleted.

1 change: 1 addition & 0 deletions filebeat/docs/reference/configuration.asciidoc
Expand Up @@ -12,6 +12,7 @@ configuration settings, you need to restart {beatname_uc} to pick up the changes
* <<configuration-filebeat-options>>
* <<configuration-global-options>>
* <<configuration-general>>
* <<filebeat-configuration-reloading>>
* <<elasticsearch-output>>
* <<logstash-output>>
* <<kafka-output>>
Expand Down
Expand Up @@ -240,7 +240,7 @@ When this option is enabled, Filebeat closes a file as soon as the end of a file

WARNING: Only use this option if you understand that data loss is a potential side effect. Another side effect is that multiline events might not be completely sent before the timeout expires.

When this option is enabled, Filebeat gives every harvester a predefined lifetime. Regardless of where the reader is in the file, reading will stop after the `close_timeout` period has elapsed. This option can be useful for older log files when you want to spend only a predefined amount of time on the files. While `close_timeout` will close the file after the predefined timeout, if the file is still being updated, the prospector will start a new harvester again per the defined `scan_frequency`. And the close_timeout for this harvester will start again with the countdown for the timeout.
When this option is enabled, Filebeat gives every harvester a predefined lifetime. Regardless of where the reader is in the file, reading will stop after the `close_timeout` period has elapsed. This option can be useful for older log files when you want to spend only a predefined amount of time on the files. While `close_timeout` will close the file after the predefined timeout, if the file is still being updated, the prospector will start a new harvester again per the defined `scan_frequency`. And the close_timeout for this harvester will start again with the countdown for the timeout.

If you set `close_timeout` to equal `ignore_older`, the file will not be picked up if it's modified while the harvester is closed. This combination of settings normally leads to data loss, and the complete file is not sent.

Expand Down Expand Up @@ -572,6 +572,8 @@ filebeat.shutdown_timeout: 5s
pass::[<?edit_url https://github.com/elastic/beats/edit/master/libbeat/docs/generalconfig.asciidoc ?>]
include::../../../../libbeat/docs/generalconfig.asciidoc[]

include::./reload-configuration.asciidoc[]

pass::[<?edit_url https://github.com/elastic/beats/edit/master/libbeat/docs/outputconfig.asciidoc ?>]
include::../../../../libbeat/docs/outputconfig.asciidoc[]

Expand Down
@@ -0,0 +1,41 @@
[[filebeat-configuration-reloading]]
=== Reload Configuration

experimental[]

Reload configuration allows to dynamically reload prospector configuration files. A glob can be defined which should be watched
for prospector configuration changes. New prospectors will be started / stopped accordingly. This is especially useful in
container environments where 1 container is used to tail logs from services in other containers on the same host.

The configuration in the main filebeat.yml config file looks as following:

[source,yaml]
------------------------------------------------------------------------------
filebeat.reload.prospectors:
enabled: true
path: configs/*.yml
period: 10s
------------------------------------------------------------------------------

A path with a glob must be defined on which files should be checked for changes. A period is set on how often
the files are checked for changes. Do not set period below 1s as the modification time of files is often stored in seconds.
Setting it below 1s will cause an unnecessary overhead.

The configuration inside the files which are found by the glob look as following:
[source,yaml]
------------------------------------------------------------------------------
- input_type: log
paths:
- /var/log/mysql.log
scan_frequency: 10s
- input_type: log
paths:
- /var/log/apache.log
scan_frequency: 5s
------------------------------------------------------------------------------

Each file directly contains a list of prospectors. Each file can contain one or multiple prospector definitions.

WARNING: It is critical that two running prospectors DO NOT have overlapping file paths defined. If more then one prospector
harvests the same file at the same time, it can lead to unexpected behaviour.
13 changes: 4 additions & 9 deletions filebeat/prospector/prospector.go
Expand Up @@ -39,15 +39,15 @@ type Prospector struct {
}

type Prospectorer interface {
Init(states []file.State) error
LoadStates(states []file.State) error
Run()
}

type Outlet interface {
OnEvent(event *input.Event) bool
}

func NewProspector(cfg *common.Config, states []file.State, outlet Outlet) (*Prospector, error) {
func NewProspector(cfg *common.Config, outlet Outlet) (*Prospector, error) {
prospector := &Prospector{
cfg: cfg,
config: defaultConfig,
Expand All @@ -72,18 +72,13 @@ func NewProspector(cfg *common.Config, states []file.State, outlet Outlet) (*Pro
return nil, err
}

err = prospector.Init(states)
if err != nil {
return nil, err
}

logp.Debug("prospector", "File Configs: %v", prospector.config.Paths)

return prospector, nil
}

// Init sets up default config for prospector
func (p *Prospector) Init(states []file.State) error {
func (p *Prospector) LoadStates(states []file.State) error {

var prospectorer Prospectorer
var err error
Expand All @@ -101,7 +96,7 @@ func (p *Prospector) Init(states []file.State) error {
return err
}

err = prospectorer.Init(states)
err = prospectorer.LoadStates(states)
if err != nil {
return err
}
Expand Down
10 changes: 8 additions & 2 deletions filebeat/prospector/prospector_log.go
Expand Up @@ -2,6 +2,7 @@ package prospector

import (
"expvar"
"fmt"
"os"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -34,17 +35,22 @@ func NewProspectorLog(p *Prospector) (*ProspectorLog, error) {
return prospectorer, nil
}

// Init sets up the prospector
// LoadStates loads states into prospector
// It goes through all states coming from the registry. Only the states which match the glob patterns of
// the prospector will be loaded and updated. All other states will not be touched.
func (p *ProspectorLog) Init(states []file.State) error {
func (p *ProspectorLog) LoadStates(states []file.State) error {
logp.Debug("prospector", "exclude_files: %s", p.config.ExcludeFiles)

for _, state := range states {
// Check if state source belongs to this prospector. If yes, update the state.
if p.matchesFile(state.Source) {
state.TTL = -1

// In case a prospector is tried to be started with an unfinished state matching the glob pattern
if !state.Finished {
return fmt.Errorf("Can only start a prospector when all related states are finished: %+v", state)
}

// Update prospector states and send new states to registry
err := p.Prospector.updateState(input.NewEvent(state))
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion filebeat/prospector/prospector_log_other_test.go
Expand Up @@ -139,8 +139,13 @@ func TestInit(t *testing.T) {
},
}
states := file.NewStates()
// Set states to finished
for i, state := range test.states {
state.Finished = true
test.states[i] = state
}
states.SetStates(test.states)
err := p.Init(states.GetStates())
err := p.LoadStates(states.GetStates())
assert.NoError(t, err)
assert.Equal(t, test.count, p.Prospector.states.Count())
}
Expand Down
7 changes: 4 additions & 3 deletions filebeat/prospector/prospector_stdin.go
Expand Up @@ -17,7 +17,9 @@ type ProspectorStdin struct {
// This prospector contains one harvester which is reading from stdin
func NewProspectorStdin(p *Prospector) (*ProspectorStdin, error) {

prospectorer := &ProspectorStdin{}
prospectorer := &ProspectorStdin{
started: false,
}

var err error

Expand All @@ -29,8 +31,7 @@ func NewProspectorStdin(p *Prospector) (*ProspectorStdin, error) {
return prospectorer, nil
}

func (p *ProspectorStdin) Init(states []file.State) error {
p.started = false
func (p *ProspectorStdin) LoadStates(states []file.State) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion filebeat/prospector/prospector_test.go
Expand Up @@ -19,7 +19,7 @@ func TestProspectorInitInputTypeLogError(t *testing.T) {

states := file.NewStates()
states.SetStates([]file.State{})
err := prospector.Init(states.GetStates())
err := prospector.LoadStates(states.GetStates())
// Error should be returned because no path is set
assert.Error(t, err)
}
Expand Down

0 comments on commit 240bbd2

Please sign in to comment.