diff --git a/filebeat/input/filestream/environment_test.go b/filebeat/input/filestream/environment_test.go index 479e939ff39..34432fce340 100644 --- a/filebeat/input/filestream/environment_test.go +++ b/filebeat/input/filestream/environment_test.go @@ -21,9 +21,11 @@ package filestream import ( "context" + "encoding/json" "fmt" "os" "path/filepath" + "strings" "sync" "testing" "time" @@ -64,6 +66,24 @@ type registryEntry struct { } func newInputTestingEnvironment(t *testing.T) *inputTestingEnvironment { + if err := logp.DevelopmentSetup(logp.ToObserverOutput()); err != nil { + t.Fatalf("error setting up dev logging: %s", err) + } + + t.Cleanup(func() { + if t.Failed() { + t.Logf("Debug Logs:\n") + for _, log := range logp.ObserverLogs().TakeAll() { + data, err := json.Marshal(log) + if err != nil { + t.Errorf("failed encoding log as JSON: %s", err) + } + t.Logf("%s", string(data)) + } + return + } + }) + return &inputTestingEnvironment{ t: t, workingDir: t.TempDir(), @@ -194,6 +214,8 @@ func (e *inputTestingEnvironment) requireRegistryEntryCount(expectedCount int) { // requireOffsetInRegistry checks if the expected offset is set for a file. func (e *inputTestingEnvironment) requireOffsetInRegistry(filename, inputID string, expectedOffset int) { e.t.Helper() + var offsetStr strings.Builder + filepath := e.abspath(filename) fi, err := os.Stat(filepath) if err != nil { @@ -202,16 +224,23 @@ func (e *inputTestingEnvironment) requireOffsetInRegistry(filename, inputID stri id := getIDFromPath(filepath, inputID, fi) var entry registryEntry - require.Eventually(e.t, func() bool { + require.Eventuallyf(e.t, func() bool { + offsetStr.Reset() + entry, err = e.getRegistryState(id) if err != nil { - return true + e.t.Fatalf("could not get state for '%s' from registry, err: %s", id, err) } + fmt.Fprint(&offsetStr, entry.Cursor.Offset) + return expectedOffset == entry.Cursor.Offset - }, time.Second, time.Millisecond) - require.NoError(e.t, err) - require.Equal(e.t, expectedOffset, entry.Cursor.Offset) + }, + time.Second, + 100*time.Millisecond, + "expected offset: '%d', cursor offset: '%s'", + expectedOffset, + &offsetStr) } // requireMetaInRegistry checks if the expected metadata is saved to the registry. @@ -251,7 +280,16 @@ func requireMetadataEquals(one, other fileMeta) bool { } // waitUntilOffsetInRegistry waits for the expected offset is set for a file. -func (e *inputTestingEnvironment) waitUntilOffsetInRegistry(filename, inputID string, expectedOffset int) { +// If timeout is reached or there is an error getting the state from the +// registry, the test fails +func (e *inputTestingEnvironment) waitUntilOffsetInRegistry( + filename, inputID string, + expectedOffset int, + timeout time.Duration) { + + var cursorString strings.Builder + var fileSizeString strings.Builder + filepath := e.abspath(filename) fi, err := os.Stat(filepath) if err != nil { @@ -259,12 +297,34 @@ func (e *inputTestingEnvironment) waitUntilOffsetInRegistry(filename, inputID st } id := getIDFromPath(filepath, inputID, fi) - entry, err := e.getRegistryState(id) - for err != nil || entry.Cursor.Offset != expectedOffset { - entry, err = e.getRegistryState(id) - } - require.Equal(e.t, expectedOffset, entry.Cursor.Offset) + require.Eventuallyf(e.t, func() bool { + cursorString.Reset() + fileSizeString.Reset() + + entry, err := e.getRegistryState(id) + if err != nil { + e.t.Fatalf( + "error getting state for ID '%s' from the registry, err: %s", + id, err) + } + + fi, err := os.Stat(filepath) + if err != nil { + e.t.Fatalf("could not stat '%s', err: %s", filepath, err) + } + + fileSizeString.WriteString(fmt.Sprint(fi.Size())) + cursorString.WriteString(fmt.Sprint(entry.Cursor.Offset)) + + return entry.Cursor.Offset == expectedOffset + }, + timeout, + 100*time.Millisecond, + "expected offset: '%d', cursor offset: '%s', file size: '%s'", + expectedOffset, + &cursorString, + &fileSizeString) } func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename, inputID string) { diff --git a/filebeat/input/filestream/filestream.go b/filebeat/input/filestream/filestream.go index e59c5725673..7dec0182f45 100644 --- a/filebeat/input/filestream/filestream.go +++ b/filebeat/input/filestream/filestream.go @@ -171,7 +171,9 @@ func (f *logFile) periodicStateCheck(ctx unison.Canceler) { return nil }) if err != nil { - f.log.Errorf("failed to schedule a periodic state check: %w", err) + if !errors.Is(err, context.Canceled) { + f.log.Errorf("failed to schedule a periodic state check: %s", err) + } } } diff --git a/filebeat/input/filestream/fswatch.go b/filebeat/input/filestream/fswatch.go index be62209bb35..68ff28f47dd 100644 --- a/filebeat/input/filestream/fswatch.go +++ b/filebeat/input/filestream/fswatch.go @@ -153,8 +153,12 @@ func (w *fileWatcher) watch(ctx unison.Canceler) { } // if the two infos belong to the same file and it has been modified - // if the size is smaller than before, it is truncated, if bigger, it is a write event - if prevInfo.ModTime() != info.ModTime() { + // if the size is smaller than before, it is truncated, if bigger, it is a write event. + // It might happen that a file is truncated and then more data is added, both + // within the same second, this will make the reader stop, but a new one will not + // start because the modification data is the same, to avoid this situation, + // we also check for size changes here. + if prevInfo.ModTime() != info.ModTime() || prevInfo.Size() != info.Size() { if prevInfo.Size() > info.Size() || w.resendOnModTime && prevInfo.Size() == info.Size() { select { case <-ctx.Done(): @@ -392,7 +396,7 @@ func (s *fileScanner) isOriginalAndSymlinkConfigured(file string, uniqFileID map } fileID := file_helper.GetOSState(fileInfo).String() if finfo, exists := uniqFileID[fileID]; exists { - s.log.Info("Same file found as symlink and original. Skipping file: %s (as it same as %s)", file, finfo.Name()) + s.log.Infof("Same file found as symlink and original. Skipping file: %s (as it same as %s)", file, finfo.Name()) return true } uniqFileID[fileID] = fileInfo diff --git a/filebeat/input/filestream/fswatch_test.go b/filebeat/input/filestream/fswatch_test.go index 440b92f1c6a..da656a1ca0f 100644 --- a/filebeat/input/filestream/fswatch_test.go +++ b/filebeat/input/filestream/fswatch_test.go @@ -221,6 +221,77 @@ func TestFileWatchNewDeleteModified(t *testing.T) { } } +func TestFileWatcherTruncate(t *testing.T) { + oldTs := time.Now() + newTs := oldTs.Add(time.Second) + testCases := map[string]struct { + prevFiles map[string]os.FileInfo + nextFiles map[string]os.FileInfo + expectedEvents []loginp.FSEvent + }{ + "truncated file, only size changes": { + prevFiles: map[string]os.FileInfo{ + "path": testFileInfo{"path", 42, oldTs, nil}, + }, + nextFiles: map[string]os.FileInfo{ + "path": testFileInfo{"path", 0, oldTs, nil}, + }, + expectedEvents: []loginp.FSEvent{ + {Op: loginp.OpTruncate, OldPath: "path", NewPath: "path", Info: testFileInfo{"path", 0, oldTs, nil}}, + }, + }, + "truncated file, mod time and size changes": { + prevFiles: map[string]os.FileInfo{ + "path": testFileInfo{"path", 42, oldTs, nil}, + }, + nextFiles: map[string]os.FileInfo{ + "path": testFileInfo{"path", 0, newTs, nil}, + }, + expectedEvents: []loginp.FSEvent{ + {Op: loginp.OpTruncate, OldPath: "path", NewPath: "path", Info: testFileInfo{"path", 0, newTs, nil}}, + }, + }, + "no file change": { + prevFiles: map[string]os.FileInfo{ + "path": testFileInfo{"path", 42, oldTs, nil}, + }, + nextFiles: map[string]os.FileInfo{ + "path": testFileInfo{"path", 42, oldTs, nil}, + }, + expectedEvents: []loginp.FSEvent{}, + }, + } + + for name, test := range testCases { + t.Run(name, func(t *testing.T) { + w := fileWatcher{ + log: logp.L(), + prev: test.prevFiles, + scanner: &mockScanner{test.nextFiles}, + events: make(chan loginp.FSEvent, len(test.expectedEvents)), + sameFileFunc: testSameFile, + } + + w.watch(context.Background()) + close(w.events) + + actual := []loginp.FSEvent{} + for evt := range w.events { + actual = append(actual, evt) + } + + if len(actual) != len(test.expectedEvents) { + t.Fatalf("expecting %d elements, got %d", len(test.expectedEvents), len(actual)) + } + for i := range test.expectedEvents { + if test.expectedEvents[i] != actual[i] { + t.Errorf("element [%d] differ. Expecting:\n%#v\nGot:\n%#v\n", i, test.expectedEvents[i], actual[i]) + } + } + }) + } +} + type mockScanner struct { files map[string]os.FileInfo } diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index 2bd53c0e4d4..65381a2b73e 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -316,11 +316,11 @@ func (inp *filestream) readFromSource( message, err := r.Next() if err != nil { if errors.Is(err, ErrFileTruncate) { - log.Infof("File was truncated. Begin reading file from offset 0. Path=%s", path) + log.Infof("File was truncated, nothing to read. Path='%s'", path) } else if errors.Is(err, ErrClosed) { - log.Info("Reader was closed. Closing.") + log.Infof("Reader was closed. Closing. Path='%s'", path) } else if errors.Is(err, io.EOF) { - log.Debugf("EOF has been reached. Closing.") + log.Debugf("EOF has been reached. Closing. Path='%s'", path) } else { log.Errorf("Read line error: %v", err) } diff --git a/filebeat/input/filestream/input_integration_test.go b/filebeat/input/filestream/input_integration_test.go index c698ff47adf..79658970c3f 100644 --- a/filebeat/input/filestream/input_integration_test.go +++ b/filebeat/input/filestream/input_integration_test.go @@ -637,7 +637,7 @@ func TestFilestreamTruncateWithSymlink(t *testing.T) { // remove symlink env.mustRemoveFile(symlinkName) env.mustTruncateFile(testlogName, 0) - env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0) + env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 10*time.Second) moreLines := []byte("forth line\nfifth line\n") env.mustWriteToFile(testlogName, moreLines) @@ -704,7 +704,7 @@ func TestFilestreamTruncateCheckOffset(t *testing.T) { env.mustTruncateFile(testlogName, 0) - env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0) + env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 10*time.Second) cancelInput() env.waitUntilInputStops() @@ -716,10 +716,9 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) { testlogName := "test.log" inp := env.mustCreateInput(map[string]interface{}{ - "id": "fake-ID", - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "1ms", - "prospector.scanner.resend_on_touch": "true", + "id": "fake-ID", + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "200ms", }) testlines := []byte("first line\nsecond line\n") @@ -743,7 +742,7 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) { env.mustTruncateFile(testlogName, 0) - env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0) + env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 10*time.Second) // all newly started client has to be cancelled so events can be processed env.pipeline.cancelAllClients() @@ -754,7 +753,7 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) { env.mustWriteToFile(testlogName, truncatedTestLines) env.waitUntilEventCount(3) - env.waitUntilOffsetInRegistry(testlogName, "fake-ID", len(truncatedTestLines)) + env.waitUntilOffsetInRegistry(testlogName, "fake-ID", len(truncatedTestLines), 10*time.Second) cancelInput() env.waitUntilInputStops() @@ -914,7 +913,7 @@ func TestFilestreamTruncate(t *testing.T) { // remove symlink env.mustRemoveFile(symlinkName) env.mustTruncateFile(testlogName, 0) - env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0) + env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 10*time.Second) // recreate symlink env.mustSymlink(testlogName, symlinkName) @@ -922,7 +921,7 @@ func TestFilestreamTruncate(t *testing.T) { moreLines := []byte("forth line\nfifth line\n") env.mustWriteToFile(testlogName, moreLines) - env.waitUntilOffsetInRegistry(testlogName, "fake-ID", len(moreLines)) + env.waitUntilOffsetInRegistry(testlogName, "fake-ID", len(moreLines), 10*time.Second) cancelInput() env.waitUntilInputStops() diff --git a/filebeat/input/filestream/internal/input-logfile/clean.go b/filebeat/input/filestream/internal/input-logfile/clean.go index 15b37957979..14035f8de4d 100644 --- a/filebeat/input/filestream/internal/input-logfile/clean.go +++ b/filebeat/input/filestream/internal/input-logfile/clean.go @@ -18,6 +18,8 @@ package input_logfile import ( + "context" + "errors" "time" "github.com/elastic/go-concert/timed" @@ -48,8 +50,8 @@ func (c *cleaner) run(canceler unison.Canceler, store *store, interval time.Dura gcStore(c.log, started, store) return nil }) - if err != nil { - c.log.Errorf("failed to start the registry cleaning routine: %w", err) + if err != nil && !errors.Is(err, context.Canceled) { + c.log.Errorf("failed to start the registry cleaning routine: %s", err) } } diff --git a/filebeat/input/filestream/internal/input-logfile/harvester.go b/filebeat/input/filestream/internal/input-logfile/harvester.go index bcd3420c01b..febaabbf42d 100644 --- a/filebeat/input/filestream/internal/input-logfile/harvester.go +++ b/filebeat/input/filestream/internal/input-logfile/harvester.go @@ -315,6 +315,7 @@ func lockResource(log *logp.Logger, resource *resource, canceler inputv2.Cancele if !resource.lock.TryLock() { log.Infof("Resource '%v' currently in use, waiting...", resource.key) err := resource.lock.LockContext(canceler) + log.Infof("Resource '%v' finally released. Lock acquired", resource.key) if err != nil { log.Infof("Input for resource '%v' has been stopped while waiting", resource.key) return err diff --git a/filebeat/input/filestream/prospector.go b/filebeat/input/filestream/prospector.go index 431542988e9..1e4a9c91c7f 100644 --- a/filebeat/input/filestream/prospector.go +++ b/filebeat/input/filestream/prospector.go @@ -198,7 +198,7 @@ func (p *fileProspector) onFSEvent( group.Start(ctx, src) case loginp.OpTruncate: - log.Debugf("File %s has been truncated", event.NewPath) + log.Debugf("File %s has been truncated setting offset to 0", event.NewPath) err := updater.ResetCursor(src, state{Offset: 0}) if err != nil {