Skip to content

Commit

Permalink
Merge pull request #1068 from ruflin/close-older-backporting
Browse files Browse the repository at this point in the history
close_older backport
  • Loading branch information
tsg committed Mar 1, 2016
2 parents 413ecc9 + a798f49 commit 37ad40b
Show file tree
Hide file tree
Showing 15 changed files with 150 additions and 28 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.asciidoc
Expand Up @@ -19,7 +19,7 @@ https://github.com/elastic/beats/compare/v1.1.0...master[Check the HEAD diff]
*Topbeat*

*Filebeat*
- Fix registrar bug for rotated files {pull}1010[1010]
- Default config for ignore_older is now infinite instead of 24h, means ignore_older is disabled by default. Use close_older to only close file handlers.


*Winlogbeat*
Expand All @@ -34,6 +34,7 @@ https://github.com/elastic/beats/compare/v1.1.0...master[Check the HEAD diff]
*Topbeat*

*Filebeat*
- Fix registrar bug for rotated files {pull}1010[1010]


*Winlogbeat*
Expand All @@ -48,6 +49,8 @@ https://github.com/elastic/beats/compare/v1.1.0...master[Check the HEAD diff]
*Topbeat*

*Filebeat*
- Add close_older configuration option to complete ignore_older https://github.com/elastic/filebeat/issues/181[181]


*Winlogbeat*

Expand Down
5 changes: 4 additions & 1 deletion filebeat/config/config.go
Expand Up @@ -15,7 +15,8 @@ import (
// Defaults for config variables which are not set
const (
DefaultRegistryFile = ".filebeat"
DefaultIgnoreOlderDuration time.Duration = 24 * time.Hour
DefaultIgnoreOlderDuration time.Duration = 0
DefaultCloseOlderDuration time.Duration = 1 * time.Hour
DefaultScanFrequency time.Duration = 10 * time.Second
DefaultSpoolSize uint64 = 2048
DefaultIdleTimeout time.Duration = 5 * time.Second
Expand Down Expand Up @@ -48,6 +49,8 @@ type ProspectorConfig struct {
Input string
IgnoreOlder string `yaml:"ignore_older"`
IgnoreOlderDuration time.Duration
CloseOlder string `yaml:"close_older"`
CloseOlderDuration time.Duration
ScanFrequency string `yaml:"scan_frequency"`
ScanFrequencyDuration time.Duration
Harvester HarvesterConfig `yaml:",inline"`
Expand Down
3 changes: 2 additions & 1 deletion filebeat/config/config_test.go
Expand Up @@ -34,7 +34,8 @@ func TestReadConfig(t *testing.T) {
assert.Equal(t, "log", prospectors[0].Input)
assert.Equal(t, 3, len(prospectors[0].Harvester.Fields))
assert.Equal(t, "1", prospectors[0].Harvester.Fields["review"])
assert.Equal(t, "24h", prospectors[0].IgnoreOlder)
assert.Equal(t, "0", prospectors[0].IgnoreOlder)
assert.Equal(t, "1h", prospectors[0].CloseOlder)
assert.Equal(t, "10s", prospectors[0].ScanFrequency)

assert.Equal(t, "stdin", prospectors[2].Input)
Expand Down
6 changes: 6 additions & 0 deletions filebeat/crawler/prospector.go
Expand Up @@ -48,6 +48,11 @@ func (p *Prospector) setupProspectorConfig() error {
return err
}

config.CloseOlderDuration, err = getConfigDuration(config.CloseOlder, cfg.DefaultCloseOlderDuration, "close_older")
if err != nil {
return err
}

config.ScanFrequencyDuration, err = getConfigDuration(config.ScanFrequency, cfg.DefaultScanFrequency, "scan_frequency")
if err != nil {
return err
Expand Down Expand Up @@ -328,6 +333,7 @@ func (p *Prospector) checkNewFile(newinfo *harvester.FileStat, file string, outp
// Check for unmodified time, but only if the file modification time is before the last scan started
// This ensures we don't skip genuine creations with dead times less than 10s
if newinfo.Fileinfo.ModTime().Before(p.lastscan) &&
p.ProspectorConfig.IgnoreOlderDuration != 0 &&
time.Since(newinfo.Fileinfo.ModTime()) > p.ProspectorConfig.IgnoreOlderDuration {

logp.Debug("prospector", "Fetching old state of file to resume: %s", file)
Expand Down
17 changes: 17 additions & 0 deletions filebeat/crawler/prospector_test.go
Expand Up @@ -93,6 +93,23 @@ func TestProspectorInitScanFrequency0(t *testing.T) {
assert.Equal(t, zero, prospector.ProspectorConfig.ScanFrequencyDuration)
}

func TestProspectorInitCloseOlder0(t *testing.T) {

prospectorConfig := config.ProspectorConfig{
CloseOlder: "0",
}

prospector := Prospector{
ProspectorConfig: prospectorConfig,
}

prospector.Init()

var zero time.Duration = 0
// 0 expected
assert.Equal(t, zero, prospector.ProspectorConfig.CloseOlderDuration)
}

func TestProspectorInitInvalidScanFrequency(t *testing.T) {

prospectorConfig := config.ProspectorConfig{
Expand Down
47 changes: 33 additions & 14 deletions filebeat/docs/reference/configuration/filebeat-options.asciidoc
Expand Up @@ -50,7 +50,7 @@ The value that you specify here is used as the `input_type` for each event publi

A list of regular expressions to match the lines that you want Filebeat to exclude. Filebeat drops any lines that match a regular expression in the list. By default, no lines are dropped.

If <<multiline>> is also specified, each multiline message is combined into a single line before the lines are filtered by `exclude_lines`.
If <<multiline>> is also specified, each multiline message is combined into a single line before the lines are filtered by `exclude_lines`.

The following example configures Filebeat to drop any lines that start with "DBG".

Expand All @@ -63,7 +63,7 @@ exclude_lines: ["^DBG"]

A list of regular expressions to match the lines that you want Filebeat to include. Filebeat exports only the lines that match a regular expression in the list. By default, all lines are exported.

If <<multiline>> is also specified, each multiline message is combined into a single line before the lines are filtered by `include_lines`.
If <<multiline>> is also specified, each multiline message is combined into a single line before the lines are filtered by `include_lines`.

The following example configures Filebeat to export any lines that start with "ERR" or "WARN":

Expand Down Expand Up @@ -116,8 +116,27 @@ If the custom field names conflict with other field names added by Filebeat, the
===== ignore_older

If this option is specified, Filebeat
ignores any files that were modified before the specified timespan.
You can use time strings like 2h (2 hours) and 5m (5 minutes). The default is 24h.
ignores any files that were modified before the specified timespan. This is disabled by default.

You can use time strings like 2h (2 hours) and 5m (5 minutes). The default is 0, which means disable.
Commenting out the config has the same affect as setting it to 0.

Files which were falling under ignore_older and are updated again, will start
from the offset the file was at when it was last ignored by ignore_older. As an example:
A file was not modified for 90 hours and the offset is at 200. Now a new line is added and
the last modification date is updated. After scan_frequency detects the change the crawling
starts at the offset 200. In case the file was falling under ignore_older already when filebeat
was started, the first 200 lines are never sent. In case filebeat was started earlier, the 200
chars were already sent and it now continues at the old offset.


===== close_older

After a file was not modified for the duration of close_older, the file handle will be closed.
After closing the file, a file change will only be detected after scan_frequency instead of almost
instant.

You can use time strings like 2h (2 hours) and 5m (5 minutes). The default is 1h.


===== scan_frequency
Expand Down Expand Up @@ -147,9 +166,9 @@ This setting is especially useful for multiline log messages, which can get larg
[[multiline]]
===== multiline

Options that control how Filebeat deals with log messages that span multiple lines. Multiline messages are common in files that contain Java stack traces.
Options that control how Filebeat deals with log messages that span multiple lines. Multiline messages are common in files that contain Java stack traces.

The following example shows how to configure Filebeat to handle a multiline message where the first line of the message begins with a bracket (`[`).
The following example shows how to configure Filebeat to handle a multiline message where the first line of the message begins with a bracket (`[`).

[source,yaml]
-------------------------------------------------------------------------------------
Expand All @@ -160,7 +179,7 @@ multiline:
-------------------------------------------------------------------------------------

Filebeat takes all the lines that do not start with `[` and combines them with the previous line that does. For example, you could use this configuration to join the following lines of a multiline message into a single event:
Filebeat takes all the lines that do not start with `[` and combines them with the previous line that does. For example, you could use this configuration to join the following lines of a multiline message into a single event:

["source","sh",subs="attributes,callouts"]
-------------------------------------------------------------------------------------
Expand All @@ -173,7 +192,7 @@ Filebeat takes all the lines that do not start with `[` and combines them with t

You specify the following settings under `multiline` to control how Filebeat combines the lines in the message:

*`pattern`*:: Specifies the regular expression pattern to match.
*`pattern`*:: Specifies the regular expression pattern to match.

*`negate`*:: Defines whether the pattern is negated. The default is `false`.

Expand All @@ -182,10 +201,10 @@ You specify the following settings under `multiline` to control how Filebeat com
[options="header"]
|=======================
|Setting for `negate` | Setting for `match` | Result
|`false` | `after` | Consecutive lines that match the pattern are appended to the previous line that doesn't match.
|`false` | `before` | Consecutive lines that match the pattern are prepended to the next line that doesn't match.
|`true` | `after` | Consecutive lines that don't match the pattern are appended to the previous line that does match.
|`true` | `before` | Consecutive lines that don't match the pattern are prepended to the next line that does match.
|`false` | `after` | Consecutive lines that match the pattern are appended to the previous line that doesn't match.
|`false` | `before` | Consecutive lines that match the pattern are prepended to the next line that doesn't match.
|`true` | `after` | Consecutive lines that don't match the pattern are appended to the previous line that does match.
|`true` | `before` | Consecutive lines that don't match the pattern are prepended to the next line that does match.
|=======================
+
NOTE: The `after` setting is equivalent to `previous` in https://www.elastic.co/guide/en/logstash/current/plugins-codecs-multiline.html[Logstash], and `before` is equivalent to `next`.
Expand All @@ -196,7 +215,7 @@ lines are discarded. The default is 500.

*`timeout`*:: After the specified timeout, Filebeat sends the multiline event even if no new pattern is found to start a new event. The default is 5s.

Here's an example configuration that shows the regular expression for a slightly more complex example:
Here's an example configuration that shows the regular expression for a slightly more complex example:

["source","sh",subs="attributes,callouts"]
-------------------------------------------------------------------------------------
Expand All @@ -211,7 +230,7 @@ In this example, the pattern matches the following lines:
* a line that begins with spaces followed by the word `at` or `...`
* a line that begins with the words `Caused by:`

You could use this configuration to join the following lines from a Java stack trace into a single event:
You could use this configuration to join the following lines from a Java stack trace into a single event:

["source","sh",subs="attributes,callouts"]
-------------------------------------------------------------------------------------
Expand Down
10 changes: 8 additions & 2 deletions filebeat/etc/beat.yml
Expand Up @@ -56,9 +56,15 @@ filebeat:
# fields.
#fields_under_root: false

# Ignore files which were modified more then the defined timespan in the past
# Ignore files which were modified more then the defined timespan in the past.
# In case all files on your system must be read you can set this value very large.
# Time strings like 2h (2 hours), 5m (5 minutes) can be used.
#ignore_older: 24h
#ignore_older: 0

# Close older closes the file handler for which were not modified
# for longer then close_older
# Time strings like 2h (2 hours), 5m (5 minutes) can be used.
#close_older: 1h

# Type to be published in the 'type' field. For Elasticsearch output,
# the type defines the document type these entries should be stored
Expand Down
13 changes: 11 additions & 2 deletions filebeat/etc/filebeat.yml
Expand Up @@ -56,9 +56,15 @@ filebeat:
# fields.
#fields_under_root: false

# Ignore files which were modified more then the defined timespan in the past
# Ignore files which were modified more then the defined timespan in the past.
# In case all files on your system must be read you can set this value very large.
# Time strings like 2h (2 hours), 5m (5 minutes) can be used.
#ignore_older: 24h
#ignore_older: 72h

# Close older closes the file handler for which were not modified
# for longer then close_older
# Time strings like 2h (2 hours), 5m (5 minutes) can be used.
#close_older: 1h

# Type to be published in the 'type' field. For Elasticsearch output,
# the type defines the document type these entries should be stored
Expand Down Expand Up @@ -145,6 +151,9 @@ filebeat:
# Event count spool threshold - forces network flush if exceeded
#spool_size: 2048

# Enable async publisher pipeline in filebeat (Experimental!)
#publish_async: false

# Defines how often the spooler is flushed. After idle_timeout the spooler is
# Flush even though spool_size is not reached.
#idle_timeout: 5s
Expand Down
2 changes: 1 addition & 1 deletion filebeat/harvester/log.go
Expand Up @@ -118,7 +118,7 @@ func (h *Harvester) Harvest() {
config := h.Config
readerConfig := logFileReaderConfig{
forceClose: config.ForceCloseFiles,
maxInactive: h.ProspectorConfig.IgnoreOlderDuration,
closeOlderDuration: h.ProspectorConfig.CloseOlderDuration,
backoffDuration: config.BackoffDuration,
maxBackoffDuration: config.MaxBackoffDuration,
backoffFactor: config.BackoffFactor,
Expand Down
2 changes: 1 addition & 1 deletion filebeat/harvester/log_test.go
Expand Up @@ -57,7 +57,7 @@ func TestReadLine(t *testing.T) {
// Read only 10 bytes which is not the end of the file
codec, _ := encoding.Plain(file)
readConfig := logFileReaderConfig{
maxInactive: 500 * time.Millisecond,
closeOlderDuration: 500 * time.Millisecond,
backoffDuration: 100 * time.Millisecond,
maxBackoffDuration: 1 * time.Second,
backoffFactor: 2,
Expand Down
6 changes: 3 additions & 3 deletions filebeat/harvester/reader.go
Expand Up @@ -22,7 +22,7 @@ type logFileReader struct {

type logFileReaderConfig struct {
forceClose bool
maxInactive time.Duration
closeOlderDuration time.Duration
backoffDuration time.Duration
maxBackoffDuration time.Duration
backoffFactor int
Expand Down Expand Up @@ -112,8 +112,8 @@ func (r *logFileReader) Read(buf []byte) (int, error) {
}

age := time.Since(r.lastTimeRead)
if age > r.config.maxInactive {
// If the file hasn't change for longer then maxInactive, harvester stops
if age > r.config.closeOlderDuration {
// If the file hasn't change for longer then closeOlder, harvester stops
// and file handle will be closed.
return n, errInactive
}
Expand Down
3 changes: 2 additions & 1 deletion filebeat/tests/files/config.yml
Expand Up @@ -12,7 +12,8 @@ filebeat:
level: debug
review: 1
type: log
ignore_older: 24h
ignore_older: 0
close_older: 1h
scan_frequency: 10s
harvester_buffer_size: 5000
tail_files: false
Expand Down
3 changes: 2 additions & 1 deletion filebeat/tests/load/filebeat.yml
Expand Up @@ -8,7 +8,8 @@ filebeat:
#fields:
# level: debug
# review: 1
ignore_older: 24h
ignore_older: 0
close_older: 1h
scan_frequency: 0s
harvester_buffer_size: 1000000

Expand Down
1 change: 1 addition & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Expand Up @@ -9,6 +9,7 @@ filebeat:
input_type: {{input_type | default("log") }}
scan_frequency: {{scan_frequency | default("0.1s") }}
ignore_older: "{{ignoreOlder}}"
close_older: "{{closeOlder}}"
harvester_buffer_size:
encoding: {{encoding | default("utf-8") }}
tail_files: {{tailFiles}}
Expand Down
55 changes: 55 additions & 0 deletions filebeat/tests/system/test_prospector.py
Expand Up @@ -114,6 +114,7 @@ def test_rotating_ignore_older_larger_write_rate(self):
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
ignoreOlder="1s",
closeOlder="1s",
scan_frequency="0.1s",
)

Expand Down Expand Up @@ -174,3 +175,57 @@ def test_exclude_files(self):
# Check that output file has the same number of lines as the log file
assert 1 == len(output)
assert output[0]["message"] == "line in log file"


def test_close_older(self):
"""
Test that close_older closes the file but reading
is picked up again after scan_frequency
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
ignoreOlder="1h",
closeOlder="1s",
scan_frequency="0.1s",
)

os.mkdir(self.working_dir + "/log/")
testfile = self.working_dir + "/log/test.log"

filebeat = self.start_filebeat(debug_selectors=['*'])

# wait for first "Start next scan" log message
self.wait_until(
lambda: self.log_contains(
"Start next scan"),
max_timeout=10)

lines = 0

# write first line
lines += 1
with open(testfile, 'a') as file:
file.write("Line {}\n".format(lines))

# wait for log to be read
self.wait_until(
lambda: self.output_has(lines=lines),
max_timeout=15)

# wait for file to be closed due to close_older
self.wait_until(
lambda: self.log_contains(
"Closing file: {}\n".format(os.path.abspath(testfile))),
max_timeout=10)

# write second line
lines += 1
with open(testfile, 'a') as file:
file.write("Line {}\n".format(lines))

self.wait_until(
# allow for events to be send multiple times due to log rotation
lambda: self.output_count(lambda x: x >= lines),
max_timeout=5)

filebeat.kill_and_wait()

0 comments on commit 37ad40b

Please sign in to comment.