Skip to content

Commit

Permalink
Add state for ignore_older files
Browse files Browse the repository at this point in the history
Previously if a file was falling under ignore_older on startup and no previous state was found, also no state was persisted. This was important in 1.x releases to prevent the registry file to contain too many files as no cleanup was possible. With the new clean_* options the registry file can now be cleaned up. For consistency all files that fall under ignore_older should have a state, even if they are not crawled before.

This change is also required to move tail_files under the prospector (#2613). Otherwise files which fall under `ignore_older` the state could not be set to the end of the file.
  • Loading branch information
ruflin committed Nov 1, 2016
1 parent 470d8b1 commit 8aebbda
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Expand Up @@ -72,6 +72,7 @@ https://github.com/elastic/beats/compare/v5.0.0...master[Check the HEAD diff]
- Add command line option -once to run filebeat only once and then close. {pull}2456[2456]
- Only load matching states into prospector to improve state handling {pull}2840[2840]
- Reset all states ttl on startup to make sure it is overwritten by new config {pull}2840[2840]
- Persist all states for files which fall under ignore_older to have consistent behaviour {pull}2859[2859]

*Winlogbeat*

Expand Down
Expand Up @@ -175,7 +175,7 @@ The files affected by this setting fall into two categories:
* Files that were never harvested
* Files that were harvested but weren't updated for longer than `ignore_older`

When a file that has never been harvested is updated, the reading starts from the beginning of the file because no state has been persisted. For a file that has been harvested previously, the state still exists, so when the file is updated, reading continues at the last position.
When a file that has never been harvested is updated, the reading starts from the beginning as the state of the file was created with the offset 0. For a file that has been harvested previously, reading continues at the last position.

The `ignore_older` setting relies on the modification time of the file to determine if a file is ignored. If the modification time of the file is not updated when lines are written to a file (which can happen on Windows), the `ignore_older` setting may cause Filebeat to ignore files even though content was added at a later time.

Expand Down Expand Up @@ -527,14 +527,14 @@ before Filebeat shuts down.

By default, this option is disabled, and Filebeat does not wait for the
publisher to finish sending events before shutting down. This means that any
events sent to the output, but not acknowledged before Filebeat shuts down,
events sent to the output, but not acknowledged before Filebeat shuts down,
are sent again when you restart Filebeat. For more details about how this
works, see <<at-least-once-delivery>>.

You can configure the `shutdown_timeout` option to specify the maximum amount
of time that Filebeat waits for the publisher to finish sending events before
shutting down. If all events are acknowledged before `shutdown_timeout` is
reached, Filebeat will shut down.
reached, Filebeat will shut down.

There is no recommended setting for this option because determining the correct
value for `shutdown_timeout` depends heavily on the environment in which
Expand Down
53 changes: 48 additions & 5 deletions filebeat/prospector/prospector_log.go
Expand Up @@ -198,11 +198,9 @@ func (p *ProspectorLog) scan() {

// Ignores all files which fall under ignore_older
if p.isIgnoreOlder(newState) {
logp.Debug("prospector", "Ignore file because ignore_older reached: %s", newState.Source)

// If last state is empty, it means state was removed or never created -> can be ignored
if !lastState.IsEmpty() && !lastState.Finished {
logp.Err("File is falling under ignore_older before harvesting is finished. Adjust your close_* settings: %s", newState.Source)
err := p.handleIgnoreOlder(lastState, newState)
if err != nil {
logp.Err("Updating ignore_older state error: %s", err)
}
continue
}
Expand Down Expand Up @@ -281,6 +279,35 @@ func (p *ProspectorLog) harvestExistingFile(newState file.State, oldState file.S
}
}

// handleIgnoreOlder handles states which fall under ignore older
// Based on the state information it is decided if the state information has to be updated or not
func (p *ProspectorLog) handleIgnoreOlder(lastState, newState file.State) error {
logp.Debug("prospector", "Ignore file because ignore_older reached: %s", newState.Source)

if !lastState.IsEmpty() {
if !lastState.Finished {
logp.Info("File is falling under ignore_older before harvesting is finished. Adjust your close_* settings: %s", newState.Source)
}
// Old state exist, no need to update it
return nil
}

// Make sure file is not falling under clean_inactive yet
if p.isCleanInactive(newState) {
logp.Debug("prospector", "Do not write state for ignore_older because clean_inactive reached")
return nil
}

// Write state for ignore_older file as none exists yet
newState.Finished = true
err := p.Prospector.updateState(input.NewEvent(newState))
if err != nil {
return err
}

return nil
}

// isFileExcluded checks if the given path should be excluded
func (p *ProspectorLog) isFileExcluded(file string) bool {
patterns := p.config.ExcludeFiles
Expand All @@ -302,3 +329,19 @@ func (p *ProspectorLog) isIgnoreOlder(state file.State) bool {

return false
}

// isCleanInactive checks if the given state false under clean_inactive
func (p *ProspectorLog) isCleanInactive(state file.State) bool {

// clean_inactive is disable
if p.config.CleanInactive <= 0 {
return false
}

modTime := state.Fileinfo.ModTime()
if time.Since(modTime) > p.config.CleanInactive {
return true
}

return false
}
64 changes: 64 additions & 0 deletions filebeat/prospector/prospector_log_test.go
@@ -0,0 +1,64 @@
// +build !integration

package prospector

import (
"os"
"testing"
"time"

"github.com/elastic/beats/filebeat/input/file"
"github.com/stretchr/testify/assert"
)

var cleanInactiveTests = []struct {
cleanInactive time.Duration
fileTime time.Time
result bool
}{
{
cleanInactive: 0,
fileTime: time.Now(),
result: false,
},
{
cleanInactive: 1 * time.Second,
fileTime: time.Now().Add(-5 * time.Second),
result: true,
},
{
cleanInactive: 10 * time.Second,
fileTime: time.Now().Add(-5 * time.Second),
result: false,
},
}

func TestIsCleanInactive(t *testing.T) {

for _, test := range cleanInactiveTests {

prospector := ProspectorLog{
config: prospectorConfig{
CleanInactive: test.cleanInactive,
},
}
state := file.State{
Fileinfo: TestFileInfo{
time: test.fileTime,
},
}

assert.Equal(t, test.result, prospector.isCleanInactive(state))
}
}

type TestFileInfo struct {
time time.Time
}

func (t TestFileInfo) Name() string { return "" }
func (t TestFileInfo) Size() int64 { return 0 }
func (t TestFileInfo) Mode() os.FileMode { return 0 }
func (t TestFileInfo) ModTime() time.Time { return t.time }
func (t TestFileInfo) IsDir() bool { return false }
func (t TestFileInfo) Sys() interface{} { return nil }
5 changes: 2 additions & 3 deletions filebeat/tests/system/test_harvester.py
Expand Up @@ -413,11 +413,10 @@ def test_close_timeout(self):
data = self.get_registry()
assert len(data) == 1

# Check that not all but some lines were read
assert self.output_lines() < 1000
# Check that not all but some lines were read. It can happen sometimes that filebeat finishes reading ...
assert self.output_lines() <= 1000
assert self.output_lines() > 0


def test_bom_utf8(self):
"""
Test utf8 log file with bom
Expand Down
91 changes: 91 additions & 0 deletions filebeat/tests/system/test_registrar.py
Expand Up @@ -1285,3 +1285,94 @@ def test_restart_state_reset_ttl_no_clean_inactive(self):
data = self.get_registry()
assert len(data) == 1
assert data[0]["ttl"] == -1

def test_ignore_older_state(self):
"""
Check that state is also persisted for files falling under ignore_older on startup
without a previous state
"""

self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
close_inactive="1s",
ignore_older="1s",
)
os.mkdir(self.working_dir + "/log/")

testfile1 = self.working_dir + "/log/test.log"

with open(testfile1, 'w') as file:
file.write("Hello World\n")

time.sleep(1)

filebeat = self.start_beat()

# Make sure file falls under ignore_older
self.wait_until(
lambda: self.log_contains("Ignore file because ignore_older reached"),
max_timeout=10)

# Make sure state is loaded for file
self.wait_until(
lambda: self.log_contains("Before: 1, After: 1"),
max_timeout=10)

# Make sure state is written
self.wait_until(
lambda: self.log_contains("Registry file updated. 1 states written."),
max_timeout=10)

filebeat.check_kill_and_wait()

data = self.get_registry()
assert len(data) == 1

# Check that offset is 0 even though there is content in it
assert data[0]["offset"] == 0

def test_ignore_older_state_clean_inactive(self):
"""
Check that state for ignore_older is not persisted when falling under clean_inactive
"""

self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
close_inactive="1s",
clean_inactive="2s",
ignore_older="1s",
)
os.mkdir(self.working_dir + "/log/")

testfile1 = self.working_dir + "/log/test.log"

with open(testfile1, 'w') as file:
file.write("Hello World\n")

time.sleep(2)

filebeat = self.start_beat()

# Make sure file falls under ignore_older
self.wait_until(
lambda: self.log_contains("Ignore file because ignore_older reached"),
max_timeout=10)

self.wait_until(
lambda: self.log_contains("Do not write state for ignore_older because clean_inactive reached"),
max_timeout=10)

# Make sure state is loaded for file
self.wait_until(
lambda: self.log_contains("Before: 0, After: 0"),
max_timeout=10)

# Make sure state is written
self.wait_until(
lambda: self.log_contains("Registry file updated. 0 states written."),
max_timeout=10)

filebeat.check_kill_and_wait()

data = self.get_registry()
assert len(data) == 0

0 comments on commit 8aebbda

Please sign in to comment.