Skip to content

Commit

Permalink
Filter out duplicate paths resolved from matching globs (#36256)
Browse files Browse the repository at this point in the history
Filter out duplicate paths resolved from matching globs

Multiple globs can resolve in duplicate filenames. These duplicates
must be filtered out even before they reach the creation of file descriptors
and additional checks, otherwise this causes a flood of warning
messages in logs.

These warnings should catch only the cases when symlinks are
resolved in known filenames.
  • Loading branch information
rdner committed Aug 8, 2023
1 parent d6d318f commit 4d0276a
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix panic when sqs input metrics getter is invoked {pull}36101[36101] {issue}36077[36077]
- Make CEL input's `now` global variable static for evaluation lifetime. {pull}36107[36107]
- Update mito CEL extension library to v1.5.0. {pull}36146[36146]
- Filter out duplicate paths resolved from matching globs. {issue}36253[36253] {pull}36256[36256]

*Heartbeat*

Expand Down
9 changes: 8 additions & 1 deletion filebeat/input/filestream/fswatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ func (s *fileScanner) GetFiles() map[string]loginp.FileDescriptor {
fdByName := map[string]loginp.FileDescriptor{}
// used to determine if a symlink resolves in a already known target
uniqueIDs := map[string]string{}

// used to filter out duplicate matches
uniqueFiles := map[string]struct{}{}
for _, path := range s.paths {
matches, err := filepath.Glob(path)
if err != nil {
Expand All @@ -366,6 +367,12 @@ func (s *fileScanner) GetFiles() map[string]loginp.FileDescriptor {
}

for _, filename := range matches {
// in case multiple globs match on the same file we filter out duplicates
if _, knownFile := uniqueFiles[filename]; knownFile {
continue
}
uniqueFiles[filename] = struct{}{}

it, err := s.getIngestTarget(filename)
if err != nil {
s.log.Debugf("cannot create an ingest target for file %q: %s", filename, err)
Expand Down
57 changes: 57 additions & 0 deletions filebeat/input/filestream/fswatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,63 @@ scanner:
// means no event
require.Equal(t, loginp.OpDone, e.Op)
})

t.Run("does not log warnings on duplicate globs and filters out duplicates", func(t *testing.T) {
dir := t.TempDir()
firstBasename := "file-123.ndjson"
secondBasename := "file-watcher-123.ndjson"
firstFilename := filepath.Join(dir, firstBasename)
secondFilename := filepath.Join(dir, secondBasename)
err := os.WriteFile(firstFilename, []byte("line\n"), 0777)
require.NoError(t, err)
err = os.WriteFile(secondFilename, []byte("line\n"), 0777)
require.NoError(t, err)

paths := []string{
// to emulate the case we have in the agent monitoring
filepath.Join(dir, "file-*.ndjson"),
filepath.Join(dir, "file-watcher-*.ndjson"),
}
cfgStr := `
scanner:
check_interval: 100ms
`

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

err = logp.DevelopmentSetup(logp.ToObserverOutput())
require.NoError(t, err)

fw := createWatcherWithConfig(t, paths, cfgStr)

go fw.Run(ctx)

e := fw.Event()
expEvent := loginp.FSEvent{
NewPath: firstFilename,
Op: loginp.OpCreate,
Descriptor: loginp.FileDescriptor{
Filename: firstFilename,
Info: testFileInfo{name: firstBasename, size: 5}, // "line\n"
},
}
requireEqualEvents(t, expEvent, e)

e = fw.Event()
expEvent = loginp.FSEvent{
NewPath: secondFilename,
Op: loginp.OpCreate,
Descriptor: loginp.FileDescriptor{
Filename: secondFilename,
Info: testFileInfo{name: secondBasename, size: 5}, // "line\n"
},
}
requireEqualEvents(t, expEvent, e)

logs := logp.ObserverLogs().FilterLevelExact(logp.WarnLevel.ZapLevel()).TakeAll()
require.Lenf(t, logs, 0, "must be no warning messages, got: %v", logs)
})
}

func TestFileScanner(t *testing.T) {
Expand Down

0 comments on commit 4d0276a

Please sign in to comment.