diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 43cd2635c26..3f3bb7c8882 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -59,10 +59,14 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { // Add prospectors created by the modules config.Prospectors = append(config.Prospectors, moduleProspectors...) - if len(config.Prospectors) == 0 { + if !config.ProspectorReload.Enabled() && len(config.Prospectors) == 0 { return nil, errors.New("No prospectors defined. What files do you want me to watch?") } + if *once && config.ProspectorReload.Enabled() { + return nil, errors.New("prospector reloading and -once cannot be used together.") + } + fb := &Filebeat{ done: make(chan struct{}), config: &config, @@ -170,7 +174,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { spooler.Stop() }() - err = crawler.Start(registrar) + err = crawler.Start(registrar, config.ProspectorReload) if err != nil { return err } diff --git a/filebeat/config/config.go b/filebeat/config/config.go index d15f2e4c4bb..744c8094faa 100644 --- a/filebeat/config/config.go +++ b/filebeat/config/config.go @@ -18,14 +18,15 @@ const ( ) type Config struct { - Prospectors []*common.Config `config:"prospectors"` - SpoolSize uint64 `config:"spool_size" validate:"min=1"` - PublishAsync bool `config:"publish_async"` - IdleTimeout time.Duration `config:"idle_timeout" validate:"nonzero,min=0s"` - RegistryFile string `config:"registry_file"` - ConfigDir string `config:"config_dir"` - ShutdownTimeout time.Duration `config:"shutdown_timeout"` - Modules []*common.Config `config:"modules"` + Prospectors []*common.Config `config:"prospectors"` + SpoolSize uint64 `config:"spool_size" validate:"min=1"` + PublishAsync bool `config:"publish_async"` + IdleTimeout time.Duration `config:"idle_timeout" validate:"nonzero,min=0s"` + RegistryFile string `config:"registry_file"` + ConfigDir string `config:"config_dir"` + ShutdownTimeout time.Duration `config:"shutdown_timeout"` + Modules []*common.Config `config:"modules"` + ProspectorReload *common.Config `config:"reload.prospectors"` } var ( diff --git a/filebeat/crawler/crawler.go b/filebeat/crawler/crawler.go index 573f89a96ef..d5a388214e6 100644 --- a/filebeat/crawler/crawler.go +++ b/filebeat/crawler/crawler.go @@ -16,15 +16,12 @@ type Crawler struct { prospectorConfigs []*common.Config out prospector.Outlet wg sync.WaitGroup + reloader *prospector.ProspectorReloader once bool } func New(out prospector.Outlet, prospectorConfigs []*common.Config, once bool) (*Crawler, error) { - if len(prospectorConfigs) == 0 { - return nil, fmt.Errorf("No prospectors defined. You must have at least one prospector defined in the config file.") - } - return &Crawler{ out: out, prospectors: map[uint64]*prospector.Prospector{}, @@ -33,7 +30,7 @@ func New(out prospector.Outlet, prospectorConfigs []*common.Config, once bool) ( }, nil } -func (c *Crawler) Start(r *registrar.Registrar) error { +func (c *Crawler) Start(r *registrar.Registrar, reloaderConfig *common.Config) error { logp.Info("Loading Prospectors: %v", len(c.prospectorConfigs)) @@ -45,6 +42,15 @@ func (c *Crawler) Start(r *registrar.Registrar) error { } } + if reloaderConfig.Enabled() { + logp.Warn("EXPERIMENTAL feature dynamic configuration reloading is enabled.") + + c.reloader = prospector.NewProspectorReloader(reloaderConfig, c.out, r) + go func() { + c.reloader.Run() + }() + } + logp.Info("Loading and starting Prospectors completed. Enabled prospectors: %v", len(c.prospectors)) return nil @@ -54,25 +60,30 @@ func (c *Crawler) startProspector(config *common.Config, states []file.State) er if !config.Enabled() { return nil } - prospector, err := prospector.NewProspector(config, states, c.out) + p, err := prospector.NewProspector(config, c.out) if err != nil { return fmt.Errorf("Error in initing prospector: %s", err) } - prospector.Once = c.once + p.Once = c.once + + if _, ok := c.prospectors[p.ID]; ok { + return fmt.Errorf("Prospector with same ID already exists: %v", p.ID) + } - if _, ok := c.prospectors[prospector.ID]; ok { - return fmt.Errorf("Prospector with same ID already exists: %v", prospector.ID) + err = p.LoadStates(states) + if err != nil { + return fmt.Errorf("error loading states for propsector %v: %v", p.ID, err) } - c.prospectors[prospector.ID] = prospector + c.prospectors[p.ID] = p c.wg.Add(1) go func() { - logp.Debug("crawler", "Starting prospector: %v", prospector.ID) - defer logp.Debug("crawler", "Prospector stopped: %v", prospector.ID) + logp.Debug("crawler", "Starting prospector: %v", p.ID) + defer logp.Debug("crawler", "Prospector stopped: %v", p.ID) defer c.wg.Done() - prospector.Run() + p.Run() }() return nil @@ -80,18 +91,27 @@ func (c *Crawler) startProspector(config *common.Config, states []file.State) er func (c *Crawler) Stop() { logp.Info("Stopping Crawler") - stopProspector := func(p *prospector.Prospector) { - defer c.wg.Done() - p.Stop() + + asyncWaitStop := func(stop func()) { + c.wg.Add(1) + go func() { + defer c.wg.Done() + stop() + }() } logp.Info("Stopping %v prospectors", len(c.prospectors)) for _, p := range c.prospectors { // Stop prospectors in parallel - c.wg.Add(1) - go stopProspector(p) + asyncWaitStop(p.Stop) } + + if c.reloader != nil { + asyncWaitStop(c.reloader.Stop) + } + c.WaitForCompletion() + logp.Info("Crawler stopped") } diff --git a/filebeat/crawler/crawler_test.go b/filebeat/crawler/crawler_test.go deleted file mode 100644 index a72c80a0e7c..00000000000 --- a/filebeat/crawler/crawler_test.go +++ /dev/null @@ -1,18 +0,0 @@ -// +build !integration - -package crawler - -import ( - "testing" - - "github.com/elastic/beats/libbeat/common" - "github.com/stretchr/testify/assert" -) - -func TestNewCrawlerNoProspectorsError(t *testing.T) { - prospectorConfigs := []*common.Config{} - - _, error := New(nil, prospectorConfigs, false) - - assert.Error(t, error) -} diff --git a/filebeat/docs/reference/configuration.asciidoc b/filebeat/docs/reference/configuration.asciidoc index 05d9ec0c05e..637b01c2587 100644 --- a/filebeat/docs/reference/configuration.asciidoc +++ b/filebeat/docs/reference/configuration.asciidoc @@ -12,6 +12,7 @@ configuration settings, you need to restart {beatname_uc} to pick up the changes * <> * <> * <> +* <> * <> * <> * <> diff --git a/filebeat/docs/reference/configuration/filebeat-options.asciidoc b/filebeat/docs/reference/configuration/filebeat-options.asciidoc index 599697728f2..91bebc207d9 100644 --- a/filebeat/docs/reference/configuration/filebeat-options.asciidoc +++ b/filebeat/docs/reference/configuration/filebeat-options.asciidoc @@ -240,7 +240,7 @@ When this option is enabled, Filebeat closes a file as soon as the end of a file WARNING: Only use this option if you understand that data loss is a potential side effect. Another side effect is that multiline events might not be completely sent before the timeout expires. -When this option is enabled, Filebeat gives every harvester a predefined lifetime. Regardless of where the reader is in the file, reading will stop after the `close_timeout` period has elapsed. This option can be useful for older log files when you want to spend only a predefined amount of time on the files. While `close_timeout` will close the file after the predefined timeout, if the file is still being updated, the prospector will start a new harvester again per the defined `scan_frequency`. And the close_timeout for this harvester will start again with the countdown for the timeout. +When this option is enabled, Filebeat gives every harvester a predefined lifetime. Regardless of where the reader is in the file, reading will stop after the `close_timeout` period has elapsed. This option can be useful for older log files when you want to spend only a predefined amount of time on the files. While `close_timeout` will close the file after the predefined timeout, if the file is still being updated, the prospector will start a new harvester again per the defined `scan_frequency`. And the close_timeout for this harvester will start again with the countdown for the timeout. If you set `close_timeout` to equal `ignore_older`, the file will not be picked up if it's modified while the harvester is closed. This combination of settings normally leads to data loss, and the complete file is not sent. @@ -572,6 +572,8 @@ filebeat.shutdown_timeout: 5s pass::[] include::../../../../libbeat/docs/generalconfig.asciidoc[] +include::./reload-configuration.asciidoc[] + pass::[] include::../../../../libbeat/docs/outputconfig.asciidoc[] diff --git a/filebeat/docs/reference/configuration/reload-configuration.asciidoc b/filebeat/docs/reference/configuration/reload-configuration.asciidoc new file mode 100644 index 00000000000..06df9d6b0b9 --- /dev/null +++ b/filebeat/docs/reference/configuration/reload-configuration.asciidoc @@ -0,0 +1,41 @@ +[[filebeat-configuration-reloading]] +=== Reload Configuration + +experimental[] + +Reload configuration allows to dynamically reload prospector configuration files. A glob can be defined which should be watched + for prospector configuration changes. New prospectors will be started / stopped accordingly. This is especially useful in + container environments where 1 container is used to tail logs from services in other containers on the same host. + +The configuration in the main filebeat.yml config file looks as following: + +[source,yaml] +------------------------------------------------------------------------------ +filebeat.reload.prospectors: + enabled: true + path: configs/*.yml + period: 10s +------------------------------------------------------------------------------ + +A path with a glob must be defined on which files should be checked for changes. A period is set on how often +the files are checked for changes. Do not set period below 1s as the modification time of files is often stored in seconds. +Setting it below 1s will cause an unnecessary overhead. + +The configuration inside the files which are found by the glob look as following: +[source,yaml] +------------------------------------------------------------------------------ +- input_type: log + paths: + - /var/log/mysql.log + scan_frequency: 10s + +- input_type: log + paths: + - /var/log/apache.log + scan_frequency: 5s +------------------------------------------------------------------------------ + +Each file directly contains a list of prospectors. Each file can contain one or multiple prospector definitions. + +WARNING: It is critical that two running prospectors DO NOT have overlapping file paths defined. If more then one prospector +harvests the same file at the same time, it can lead to unexpected behaviour. diff --git a/filebeat/prospector/prospector.go b/filebeat/prospector/prospector.go index 7e745889216..3ce402298af 100644 --- a/filebeat/prospector/prospector.go +++ b/filebeat/prospector/prospector.go @@ -39,7 +39,7 @@ type Prospector struct { } type Prospectorer interface { - Init(states []file.State) error + LoadStates(states []file.State) error Run() } @@ -47,7 +47,7 @@ type Outlet interface { OnEvent(event *input.Event) bool } -func NewProspector(cfg *common.Config, states []file.State, outlet Outlet) (*Prospector, error) { +func NewProspector(cfg *common.Config, outlet Outlet) (*Prospector, error) { prospector := &Prospector{ cfg: cfg, config: defaultConfig, @@ -72,18 +72,13 @@ func NewProspector(cfg *common.Config, states []file.State, outlet Outlet) (*Pro return nil, err } - err = prospector.Init(states) - if err != nil { - return nil, err - } - logp.Debug("prospector", "File Configs: %v", prospector.config.Paths) return prospector, nil } // Init sets up default config for prospector -func (p *Prospector) Init(states []file.State) error { +func (p *Prospector) LoadStates(states []file.State) error { var prospectorer Prospectorer var err error @@ -101,7 +96,7 @@ func (p *Prospector) Init(states []file.State) error { return err } - err = prospectorer.Init(states) + err = prospectorer.LoadStates(states) if err != nil { return err } diff --git a/filebeat/prospector/prospector_log.go b/filebeat/prospector/prospector_log.go index 3dd49547db0..b4ae09fc7e5 100644 --- a/filebeat/prospector/prospector_log.go +++ b/filebeat/prospector/prospector_log.go @@ -2,6 +2,7 @@ package prospector import ( "expvar" + "fmt" "os" "path/filepath" "runtime" @@ -34,10 +35,10 @@ func NewProspectorLog(p *Prospector) (*ProspectorLog, error) { return prospectorer, nil } -// Init sets up the prospector +// LoadStates loads states into prospector // It goes through all states coming from the registry. Only the states which match the glob patterns of // the prospector will be loaded and updated. All other states will not be touched. -func (p *ProspectorLog) Init(states []file.State) error { +func (p *ProspectorLog) LoadStates(states []file.State) error { logp.Debug("prospector", "exclude_files: %s", p.config.ExcludeFiles) for _, state := range states { @@ -45,6 +46,11 @@ func (p *ProspectorLog) Init(states []file.State) error { if p.matchesFile(state.Source) { state.TTL = -1 + // In case a prospector is tried to be started with an unfinished state matching the glob pattern + if !state.Finished { + return fmt.Errorf("Can only start a prospector when all related states are finished: %+v", state) + } + // Update prospector states and send new states to registry err := p.Prospector.updateState(input.NewEvent(state)) if err != nil { diff --git a/filebeat/prospector/prospector_log_other_test.go b/filebeat/prospector/prospector_log_other_test.go index 2a2fb0f013f..8108968576e 100644 --- a/filebeat/prospector/prospector_log_other_test.go +++ b/filebeat/prospector/prospector_log_other_test.go @@ -139,8 +139,13 @@ func TestInit(t *testing.T) { }, } states := file.NewStates() + // Set states to finished + for i, state := range test.states { + state.Finished = true + test.states[i] = state + } states.SetStates(test.states) - err := p.Init(states.GetStates()) + err := p.LoadStates(states.GetStates()) assert.NoError(t, err) assert.Equal(t, test.count, p.Prospector.states.Count()) } diff --git a/filebeat/prospector/prospector_stdin.go b/filebeat/prospector/prospector_stdin.go index 10b671d6b73..0279edcb4f3 100644 --- a/filebeat/prospector/prospector_stdin.go +++ b/filebeat/prospector/prospector_stdin.go @@ -17,7 +17,9 @@ type ProspectorStdin struct { // This prospector contains one harvester which is reading from stdin func NewProspectorStdin(p *Prospector) (*ProspectorStdin, error) { - prospectorer := &ProspectorStdin{} + prospectorer := &ProspectorStdin{ + started: false, + } var err error @@ -29,8 +31,7 @@ func NewProspectorStdin(p *Prospector) (*ProspectorStdin, error) { return prospectorer, nil } -func (p *ProspectorStdin) Init(states []file.State) error { - p.started = false +func (p *ProspectorStdin) LoadStates(states []file.State) error { return nil } diff --git a/filebeat/prospector/prospector_test.go b/filebeat/prospector/prospector_test.go index 720e63a67b3..f4ed5460def 100644 --- a/filebeat/prospector/prospector_test.go +++ b/filebeat/prospector/prospector_test.go @@ -19,7 +19,7 @@ func TestProspectorInitInputTypeLogError(t *testing.T) { states := file.NewStates() states.SetStates([]file.State{}) - err := prospector.Init(states.GetStates()) + err := prospector.LoadStates(states.GetStates()) // Error should be returned because no path is set assert.Error(t, err) } diff --git a/filebeat/prospector/registry.go b/filebeat/prospector/registry.go new file mode 100644 index 00000000000..71ca0c82253 --- /dev/null +++ b/filebeat/prospector/registry.go @@ -0,0 +1,46 @@ +package prospector + +import "sync" + +type registry struct { + sync.Mutex + List map[uint64]*Prospector +} + +func newRegistry() *registry { + return ®istry{ + List: map[uint64]*Prospector{}, + } +} + +func (r *registry) Add(hash uint64, m *Prospector) { + r.Lock() + defer r.Unlock() + r.List[hash] = m +} + +func (r *registry) Remove(hash uint64) { + r.Lock() + defer r.Unlock() + delete(r.List, hash) +} + +func (r *registry) Has(hash uint64) bool { + r.Lock() + defer r.Unlock() + + _, ok := r.List[hash] + return ok +} + +func (r *registry) CopyList() map[uint64]*Prospector { + r.Lock() + defer r.Unlock() + + // Create a copy of the list + list := map[uint64]*Prospector{} + for k, v := range r.List { + list[k] = v + } + return list +} diff --git a/filebeat/prospector/reloader.go b/filebeat/prospector/reloader.go new file mode 100644 index 00000000000..f8f21da39d7 --- /dev/null +++ b/filebeat/prospector/reloader.go @@ -0,0 +1,177 @@ +package prospector + +import ( + "expvar" + "path/filepath" + "sync" + "time" + + "github.com/elastic/beats/filebeat/registrar" + "github.com/elastic/beats/libbeat/cfgfile" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/paths" +) + +var ( + debugr = logp.MakeDebug("filebeat.reloader") + configReloads = expvar.NewInt("filebeat.config.reloads") + prospectorStarts = expvar.NewInt("filebeat.config.prospector.dyamic.starts") + prospectorStops = expvar.NewInt("filebeat.config.prospector.dyamic.stops") + prospectorRunning = expvar.NewInt("filebeat.config.prospector.dyamic.running") +) + +type ProspectorReloader struct { + registry *registry + config cfgfile.ReloadConfig + outlet Outlet + done chan struct{} + wg sync.WaitGroup + registrar *registrar.Registrar +} + +func NewProspectorReloader(cfg *common.Config, outlet Outlet, registrar *registrar.Registrar) *ProspectorReloader { + + config := cfgfile.DefaultReloadConfig + cfg.Unpack(&config) + + return &ProspectorReloader{ + registry: newRegistry(), + config: config, + outlet: outlet, + done: make(chan struct{}), + registrar: registrar, + } +} + +func (r *ProspectorReloader) Run() { + + logp.Info("Prospector reloader started") + + r.wg.Add(1) + defer r.wg.Done() + + // Stop all running prospectors when method finishes + defer r.stopProspectors(r.registry.CopyList()) + + path := r.config.Path + if !filepath.IsAbs(path) { + path = paths.Resolve(paths.Config, path) + } + + gw := cfgfile.NewGlobWatcher(path) + + for { + select { + case <-r.done: + logp.Info("Dynamic config reloader stopped") + return + case <-time.After(r.config.Period): + + debugr("Scan for new config files") + + files, updated, err := gw.Scan() + if err != nil { + // In most cases of error, updated == false, so will continue + // to next iteration below + logp.Err("Error fetching new config files: %v", err) + } + + // no file changes + if !updated { + continue + } + + configReloads.Add(1) + + // Load all config objects + configs := []*common.Config{} + for _, file := range files { + c, err := cfgfile.LoadList(file) + if err != nil { + logp.Err("Error loading config: %s", err) + continue + } + + configs = append(configs, c...) + } + + debugr("Number of prospectors configs created: %v", len(configs)) + + var startList []*Prospector + stopList := r.registry.CopyList() + + for _, c := range configs { + + // Only add prospectors to startlist which are enabled + if !c.Enabled() { + continue + } + + p, err := NewProspector(c, r.outlet) + if err != nil { + logp.Err("Error creating prospector: %s", err) + continue + } + + debugr("Remove prospector from stoplist: %v", p.ID) + delete(stopList, p.ID) + + // As prospector already exist, it must be removed from the stop list and not started + if !r.registry.Has(p.ID) { + debugr("Add prospector to startlist: %v", p.ID) + startList = append(startList, p) + continue + } + } + + r.stopProspectors(stopList) + r.startProspectors(startList) + } + } +} + +func (r *ProspectorReloader) startProspectors(prospectors []*Prospector) { + for _, p := range prospectors { + err := p.LoadStates(r.registrar.GetStates()) + if err != nil { + logp.Err("Error loading states for prospector %v: %v", p.ID, err) + continue + } + r.registry.Add(p.ID, p) + go func(pr *Prospector) { + prospectorStarts.Add(1) + prospectorRunning.Add(1) + defer func() { + r.registry.Remove(pr.ID) + logp.Info("Prospector stopped: %v", pr.ID) + }() + pr.Run() + }(p) + } + +} + +func (r *ProspectorReloader) stopProspectors(prospectors map[uint64]*Prospector) { + wg := sync.WaitGroup{} + for _, p := range prospectors { + wg.Add(1) + go func(pr *Prospector) { + defer wg.Done() + logp.Debug("reload", "stopping prospector: %v", pr.ID) + pr.Stop() + prospectorStops.Add(1) + prospectorRunning.Add(-1) + }(p) + } + wg.Wait() +} + +func (r *ProspectorReloader) Stop() { + close(r.done) + // Wait until reloading finished + r.wg.Wait() + + // Stop all prospectors + r.stopProspectors(r.registry.CopyList()) +} diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index c3cc5aa2220..5b34221f828 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -78,6 +78,12 @@ filebeat.registry_file: {{ beat.working_dir + '/' }}{{ registryFile|default("reg {%endif%} filebeat.publish_async: {{publish_async}} +{% if reload -%} +filebeat.reload.prospectors: + path: {{ reload_path }} + period: 1s + enabled: true +{% endif -%} #================================ General ===================================== diff --git a/filebeat/tests/system/test_reload.py b/filebeat/tests/system/test_reload.py new file mode 100644 index 00000000000..27bf73afaef --- /dev/null +++ b/filebeat/tests/system/test_reload.py @@ -0,0 +1,140 @@ +import re +import sys +import unittest +import os +import time +from filebeat import BaseTest + + +prospectorConfigTemplate = """ +- input_type: log + paths: + - {} + scan_frequency: 1s +""" + +class Test(BaseTest): + + def test_reload(self): + """ + Test basic reload + """ + self.render_config_template( + reload=True, + reload_path=self.working_dir + "/configs/*.yml", + prospectors=False, + ) + + proc = self.start_beat() + + os.mkdir(self.working_dir + "/logs/") + logfile = self.working_dir + "/logs/test.log" + os.mkdir(self.working_dir + "/configs/") + + with open(self.working_dir + "/configs/prospector.yml", 'w') as f: + f.write(prospectorConfigTemplate.format(self.working_dir + "/logs/*")) + + with open(logfile, 'w') as f: + f.write("Hello world\n") + + self.wait_until(lambda: self.output_lines() > 0) + proc.check_kill_and_wait() + + def test_start_stop(self): + """ + Test basic start and stop + """ + self.render_config_template( + reload=True, + reload_path=self.working_dir + "/configs/*.yml", + prospectors=False, + ) + + proc = self.start_beat() + + os.mkdir(self.working_dir + "/logs/") + logfile = self.working_dir + "/logs/test.log" + os.mkdir(self.working_dir + "/configs/") + + with open(self.working_dir + "/configs/prospector.yml", 'w') as f: + f.write(prospectorConfigTemplate.format(self.working_dir + "/logs/*")) + + with open(logfile, 'w') as f: + f.write("Hello world\n") + + self.wait_until(lambda: self.output_lines() == 1) + + # Remove prospector + with open(self.working_dir + "/configs/prospector.yml", 'w') as f: + f.write("") + + # Wait until prospector is stopped + self.wait_until( + lambda: self.log_contains("Prospector stopped:"), + max_timeout=15) + + with open(logfile, 'a') as f: + f.write("Hello world\n") + + # Wait to give a change to pick up the new line (it shouldn't) + time.sleep(1) + + proc.check_kill_and_wait() + + assert self.output_lines() == 1 + + def test_start_stop_replace(self): + """ + Test basic start and replace with an other prospecto + """ + self.render_config_template( + reload=True, + reload_path=self.working_dir + "/configs/*.yml", + prospectors=False, + ) + + proc = self.start_beat() + + os.mkdir(self.working_dir + "/logs/") + logfile1 = self.working_dir + "/logs/test1.log" + logfile2 = self.working_dir + "/logs/test2.log" + os.mkdir(self.working_dir + "/configs/") + first_line = "First log file" + second_line = "Second log file" + + with open(self.working_dir + "/configs/prospector.yml", 'w') as f: + f.write(prospectorConfigTemplate.format(self.working_dir + "/logs/test1.log")) + + with open(logfile1, 'w') as f: + f.write(first_line + "\n") + + self.wait_until(lambda: self.output_lines() == 1) + + # Remove prospector + with open(self.working_dir + "/configs/prospector.yml", 'w') as f: + f.write("") + + # Wait until prospector is stopped + self.wait_until( + lambda: self.log_contains("Prospector stopped:"), + max_timeout=15) + + with open(self.working_dir + "/configs/prospector.yml", 'w') as f: + f.write(prospectorConfigTemplate.format(self.working_dir + "/logs/test2.log")) + + # Update both log files, only 1 change should be picke dup + with open(logfile1, 'a') as f: + f.write("First log file 1\n") + with open(logfile2, 'a') as f: + f.write(second_line + "\n") + + self.wait_until(lambda: self.output_lines() == 2) + + proc.check_kill_and_wait() + + output = self.read_output() + + # Make sure the correct lines were picked up + assert output[0]["message"] == first_line + assert output[1]["message"] == second_line + assert self.output_lines() == 2 diff --git a/libbeat/cfgfile/glob_watcher.go b/libbeat/cfgfile/glob_watcher.go index 38a328acb1a..30d6292af31 100644 --- a/libbeat/cfgfile/glob_watcher.go +++ b/libbeat/cfgfile/glob_watcher.go @@ -56,11 +56,15 @@ func (gw *GlobWatcher) Scan() ([]string, bool, error) { } // Check if one of the files was changed recently - // File modification time can be in seconds. -1 is to cover for files which + // File modification time can be in seconds. -1 + truncation is to cover for files which // were created during this second. - if info.ModTime().After(gw.lastScan.Truncate(time.Second)) { + // If the last scan was at 09:02:15.00001 it will pick up files which were modified also 09:02:14 + // As this scan no necessarly picked up files form 09:02:14 + // TODO: How could this be improved / simplified? Behaviour was sometimes flaky. Is ModTime updated with delay? + if info.ModTime().After(gw.lastScan.Add(-1 * time.Second).Truncate(time.Second)) { updatedFiles = true } + files = append(files, f) } diff --git a/libbeat/cfgfile/glob_watcher_test.go b/libbeat/cfgfile/glob_watcher_test.go index 25405730541..65da24287f4 100644 --- a/libbeat/cfgfile/glob_watcher_test.go +++ b/libbeat/cfgfile/glob_watcher_test.go @@ -29,7 +29,7 @@ func TestGlobWatcher(t *testing.T) { assert.NoError(t, err) // Make sure not inside compensation time - time.Sleep(1 * time.Second) + time.Sleep(2 * time.Second) files, changed, err := gcd.Scan() assert.Equal(t, 2, len(files)) diff --git a/metricbeat/mb/module/config.go b/libbeat/cfgfile/reload.go similarity index 55% rename from metricbeat/mb/module/config.go rename to libbeat/cfgfile/reload.go index e81f5d73442..44f398ccec1 100644 --- a/metricbeat/mb/module/config.go +++ b/libbeat/cfgfile/reload.go @@ -1,19 +1,17 @@ -package module +package cfgfile import "time" -// ReloaderConfig contains config options for the module Reloader. -type ReloaderConfig struct { - // If path is a relative path, it is relative to the ${path.config} - Path string `config:"path"` - Period time.Duration `config:"period"` - Enabled bool `config:"enabled"` -} - var ( - // DefaultReloaderConfig contains the default config options. - DefaultReloaderConfig = ReloaderConfig{ + DefaultReloadConfig = ReloadConfig{ Period: 10 * time.Second, Enabled: false, } ) + +type ReloadConfig struct { + // If path is a relative path, it is relative to the ${path.config} + Path string `config:"path"` + Period time.Duration `config:"period"` + Enabled bool `config:"enabled"` +} diff --git a/libbeat/common/config.go b/libbeat/common/config.go index 5355ef2408e..593471acbdf 100644 --- a/libbeat/common/config.go +++ b/libbeat/common/config.go @@ -78,7 +78,7 @@ func MergeConfigs(cfgs ...*Config) (*Config, error) { func NewConfigWithYAML(in []byte, source string) (*Config, error) { opts := append( []ucfg.Option{ - ucfg.MetaData(ucfg.Meta{source}), + ucfg.MetaData(ucfg.Meta{Source: source}), }, configOpts..., ) @@ -94,7 +94,7 @@ func NewFlagConfig( ) *Config { opts := append( []ucfg.Option{ - ucfg.MetaData(ucfg.Meta{"command line flag"}), + ucfg.MetaData(ucfg.Meta{Source: "command line flag"}), }, configOpts..., ) @@ -289,7 +289,7 @@ func (f *flagOverwrite) String() string { func (f *flagOverwrite) Set(v string) error { opts := append( []ucfg.Option{ - ucfg.MetaData(ucfg.Meta{"command line flag"}), + ucfg.MetaData(ucfg.Meta{Source: "command line flag"}), }, configOpts..., ) diff --git a/metricbeat/mb/module/reload.go b/metricbeat/mb/module/reload.go index 9fb0ced0203..184555ffdc6 100644 --- a/metricbeat/mb/module/reload.go +++ b/metricbeat/mb/module/reload.go @@ -9,6 +9,7 @@ import ( "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/paths" "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/metricbeat/mb" ) @@ -23,7 +24,7 @@ var ( // Reloader is used to register and reload modules type Reloader struct { registry *registry - config ReloaderConfig + config cfgfile.ReloadConfig client func() publisher.Client done chan struct{} wg sync.WaitGroup @@ -32,7 +33,7 @@ type Reloader struct { // NewReloader creates new Reloader instance for the given config func NewReloader(cfg *common.Config, p publisher.Publisher) *Reloader { - config := DefaultReloaderConfig + config := cfgfile.DefaultReloadConfig cfg.Unpack(&config) return &Reloader{ @@ -56,7 +57,7 @@ func (r *Reloader) Run() { path := r.config.Path if !filepath.IsAbs(path) { - path = filepath.Join(cfgfile.GetPathConfig(), path) + path = paths.Resolve(paths.Config, path) } gw := cfgfile.NewGlobWatcher(path)