Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix filestream flaky truncated file test #35759

Merged
merged 6 commits into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 71 additions & 11 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ package filestream

import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -64,6 +66,24 @@ type registryEntry struct {
}

func newInputTestingEnvironment(t *testing.T) *inputTestingEnvironment {
if err := logp.DevelopmentSetup(logp.ToObserverOutput()); err != nil {
t.Fatalf("error setting up dev logging: %s", err)
}

t.Cleanup(func() {
if t.Failed() {
t.Logf("Debug Logs:\n")
for _, log := range logp.ObserverLogs().TakeAll() {
data, err := json.Marshal(log)
if err != nil {
t.Errorf("failed encoding log as JSON: %s", err)
}
t.Logf("%s", string(data))
}
return
}
})

return &inputTestingEnvironment{
t: t,
workingDir: t.TempDir(),
Expand Down Expand Up @@ -194,6 +214,8 @@ func (e *inputTestingEnvironment) requireRegistryEntryCount(expectedCount int) {
// requireOffsetInRegistry checks if the expected offset is set for a file.
func (e *inputTestingEnvironment) requireOffsetInRegistry(filename, inputID string, expectedOffset int) {
e.t.Helper()
var offsetStr strings.Builder

filepath := e.abspath(filename)
fi, err := os.Stat(filepath)
if err != nil {
Expand All @@ -202,16 +224,23 @@ func (e *inputTestingEnvironment) requireOffsetInRegistry(filename, inputID stri

id := getIDFromPath(filepath, inputID, fi)
var entry registryEntry
require.Eventually(e.t, func() bool {
require.Eventuallyf(e.t, func() bool {
offsetStr.Reset()

entry, err = e.getRegistryState(id)
if err != nil {
return true
e.t.Fatalf("could not get state for '%s' from registry, err: %s", id, err)
}

fmt.Fprint(&offsetStr, entry.Cursor.Offset)

return expectedOffset == entry.Cursor.Offset
}, time.Second, time.Millisecond)
require.NoError(e.t, err)
require.Equal(e.t, expectedOffset, entry.Cursor.Offset)
},
time.Second,
100*time.Millisecond,
"expected offset: '%d', cursor offset: '%s'",
expectedOffset,
&offsetStr)
}

// requireMetaInRegistry checks if the expected metadata is saved to the registry.
Expand Down Expand Up @@ -251,20 +280,51 @@ func requireMetadataEquals(one, other fileMeta) bool {
}

// waitUntilOffsetInRegistry waits for the expected offset is set for a file.
func (e *inputTestingEnvironment) waitUntilOffsetInRegistry(filename, inputID string, expectedOffset int) {
// If timeout is reached or there is an error getting the state from the
// registry, the test fails
func (e *inputTestingEnvironment) waitUntilOffsetInRegistry(
filename, inputID string,
expectedOffset int,
timeout time.Duration) {

var cursorString strings.Builder
var fileSizeString strings.Builder

filepath := e.abspath(filename)
fi, err := os.Stat(filepath)
if err != nil {
e.t.Fatalf("cannot stat file when cheking for offset: %+v", err)
}

id := getIDFromPath(filepath, inputID, fi)
entry, err := e.getRegistryState(id)
for err != nil || entry.Cursor.Offset != expectedOffset {
entry, err = e.getRegistryState(id)
}

require.Equal(e.t, expectedOffset, entry.Cursor.Offset)
require.Eventuallyf(e.t, func() bool {
cursorString.Reset()
fileSizeString.Reset()

entry, err := e.getRegistryState(id)
if err != nil {
e.t.Fatalf(
"error getting state for ID '%s' from the registry, err: %s",
id, err)
}

fi, err := os.Stat(filepath)
if err != nil {
e.t.Fatalf("could not stat '%s', err: %s", filepath, err)
}

fileSizeString.WriteString(fmt.Sprint(fi.Size()))
cursorString.WriteString(fmt.Sprint(entry.Cursor.Offset))

return entry.Cursor.Offset == expectedOffset
},
timeout,
100*time.Millisecond,
"expected offset: '%d', cursor offset: '%s', file size: '%s'",
expectedOffset,
&cursorString,
&fileSizeString)
}

func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename, inputID string) {
Expand Down
4 changes: 3 additions & 1 deletion filebeat/input/filestream/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ func (f *logFile) periodicStateCheck(ctx unison.Canceler) {
return nil
})
if err != nil {
f.log.Errorf("failed to schedule a periodic state check: %w", err)
if !errors.Is(err, context.Canceled) {
f.log.Errorf("failed to schedule a periodic state check: %s", err)
}
}
}

Expand Down
10 changes: 7 additions & 3 deletions filebeat/input/filestream/fswatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,12 @@ func (w *fileWatcher) watch(ctx unison.Canceler) {
}

// if the two infos belong to the same file and it has been modified
// if the size is smaller than before, it is truncated, if bigger, it is a write event
if prevInfo.ModTime() != info.ModTime() {
// if the size is smaller than before, it is truncated, if bigger, it is a write event.
// It might happen that a file is truncated and then more data is added, both
// within the same second, this will make the reader stop, but a new one will not
// start because the modification data is the same, to avoid this situation,
// we also check for size changes here.
if prevInfo.ModTime() != info.ModTime() || prevInfo.Size() != info.Size() {
belimawr marked this conversation as resolved.
Show resolved Hide resolved
if prevInfo.Size() > info.Size() || w.resendOnModTime && prevInfo.Size() == info.Size() {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -392,7 +396,7 @@ func (s *fileScanner) isOriginalAndSymlinkConfigured(file string, uniqFileID map
}
fileID := file_helper.GetOSState(fileInfo).String()
if finfo, exists := uniqFileID[fileID]; exists {
s.log.Info("Same file found as symlink and original. Skipping file: %s (as it same as %s)", file, finfo.Name())
s.log.Infof("Same file found as symlink and original. Skipping file: %s (as it same as %s)", file, finfo.Name())
return true
}
uniqFileID[fileID] = fileInfo
Expand Down
71 changes: 71 additions & 0 deletions filebeat/input/filestream/fswatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,77 @@ func TestFileWatchNewDeleteModified(t *testing.T) {
}
}

func TestFileWatcherTruncate(t *testing.T) {
oldTs := time.Now()
newTs := oldTs.Add(time.Second)
testCases := map[string]struct {
prevFiles map[string]os.FileInfo
nextFiles map[string]os.FileInfo
expectedEvents []loginp.FSEvent
}{
"truncated file, only size changes": {
prevFiles: map[string]os.FileInfo{
"path": testFileInfo{"path", 42, oldTs, nil},
},
nextFiles: map[string]os.FileInfo{
"path": testFileInfo{"path", 0, oldTs, nil},
},
expectedEvents: []loginp.FSEvent{
{Op: loginp.OpTruncate, OldPath: "path", NewPath: "path", Info: testFileInfo{"path", 0, oldTs, nil}},
},
},
"truncated file, mod time and size changes": {
prevFiles: map[string]os.FileInfo{
"path": testFileInfo{"path", 42, oldTs, nil},
},
nextFiles: map[string]os.FileInfo{
"path": testFileInfo{"path", 0, newTs, nil},
},
expectedEvents: []loginp.FSEvent{
{Op: loginp.OpTruncate, OldPath: "path", NewPath: "path", Info: testFileInfo{"path", 0, newTs, nil}},
},
},
"no file change": {
prevFiles: map[string]os.FileInfo{
"path": testFileInfo{"path", 42, oldTs, nil},
},
nextFiles: map[string]os.FileInfo{
"path": testFileInfo{"path", 42, oldTs, nil},
},
expectedEvents: []loginp.FSEvent{},
},
}

for name, test := range testCases {
t.Run(name, func(t *testing.T) {
w := fileWatcher{
log: logp.L(),
prev: test.prevFiles,
scanner: &mockScanner{test.nextFiles},
events: make(chan loginp.FSEvent, len(test.expectedEvents)),
sameFileFunc: testSameFile,
}

w.watch(context.Background())
close(w.events)

actual := []loginp.FSEvent{}
for evt := range w.events {
actual = append(actual, evt)
}

if len(actual) != len(test.expectedEvents) {
t.Fatalf("expecting %d elements, got %d", len(test.expectedEvents), len(actual))
}
for i := range test.expectedEvents {
if test.expectedEvents[i] != actual[i] {
t.Errorf("element [%d] differ. Expecting:\n%#v\nGot:\n%#v\n", i, test.expectedEvents[i], actual[i])
}
}
})
}
}

type mockScanner struct {
files map[string]os.FileInfo
}
Expand Down
6 changes: 3 additions & 3 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,11 +316,11 @@ func (inp *filestream) readFromSource(
message, err := r.Next()
if err != nil {
if errors.Is(err, ErrFileTruncate) {
log.Infof("File was truncated. Begin reading file from offset 0. Path=%s", path)
log.Infof("File was truncated, nothing to read. Path='%s'", path)
} else if errors.Is(err, ErrClosed) {
log.Info("Reader was closed. Closing.")
log.Infof("Reader was closed. Closing. Path='%s'", path)
} else if errors.Is(err, io.EOF) {
log.Debugf("EOF has been reached. Closing.")
log.Debugf("EOF has been reached. Closing. Path='%s'", path)
} else {
log.Errorf("Read line error: %v", err)
}
Expand Down
19 changes: 9 additions & 10 deletions filebeat/input/filestream/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ func TestFilestreamTruncateWithSymlink(t *testing.T) {
// remove symlink
env.mustRemoveFile(symlinkName)
env.mustTruncateFile(testlogName, 0)
env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0)
env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 10*time.Second)

moreLines := []byte("forth line\nfifth line\n")
env.mustWriteToFile(testlogName, moreLines)
Expand Down Expand Up @@ -704,7 +704,7 @@ func TestFilestreamTruncateCheckOffset(t *testing.T) {

env.mustTruncateFile(testlogName, 0)

env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0)
env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 10*time.Second)

cancelInput()
env.waitUntilInputStops()
Expand All @@ -716,10 +716,9 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) {

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"id": "fake-ID",
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.resend_on_touch": "true",
"id": "fake-ID",
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "200ms",
})

testlines := []byte("first line\nsecond line\n")
Expand All @@ -743,7 +742,7 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) {

env.mustTruncateFile(testlogName, 0)

env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0)
env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 10*time.Second)

// all newly started client has to be cancelled so events can be processed
env.pipeline.cancelAllClients()
Expand All @@ -754,7 +753,7 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) {
env.mustWriteToFile(testlogName, truncatedTestLines)

env.waitUntilEventCount(3)
env.waitUntilOffsetInRegistry(testlogName, "fake-ID", len(truncatedTestLines))
env.waitUntilOffsetInRegistry(testlogName, "fake-ID", len(truncatedTestLines), 10*time.Second)

cancelInput()
env.waitUntilInputStops()
Expand Down Expand Up @@ -914,15 +913,15 @@ func TestFilestreamTruncate(t *testing.T) {
// remove symlink
env.mustRemoveFile(symlinkName)
env.mustTruncateFile(testlogName, 0)
env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0)
env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 10*time.Second)

// recreate symlink
env.mustSymlink(testlogName, symlinkName)

moreLines := []byte("forth line\nfifth line\n")
env.mustWriteToFile(testlogName, moreLines)

env.waitUntilOffsetInRegistry(testlogName, "fake-ID", len(moreLines))
env.waitUntilOffsetInRegistry(testlogName, "fake-ID", len(moreLines), 10*time.Second)

cancelInput()
env.waitUntilInputStops()
Expand Down
6 changes: 4 additions & 2 deletions filebeat/input/filestream/internal/input-logfile/clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package input_logfile

import (
"context"
"errors"
"time"

"github.com/elastic/go-concert/timed"
Expand Down Expand Up @@ -48,8 +50,8 @@ func (c *cleaner) run(canceler unison.Canceler, store *store, interval time.Dura
gcStore(c.log, started, store)
return nil
})
if err != nil {
c.log.Errorf("failed to start the registry cleaning routine: %w", err)
if err != nil && !errors.Is(err, context.Canceled) {
c.log.Errorf("failed to start the registry cleaning routine: %s", err)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ func lockResource(log *logp.Logger, resource *resource, canceler inputv2.Cancele
if !resource.lock.TryLock() {
log.Infof("Resource '%v' currently in use, waiting...", resource.key)
err := resource.lock.LockContext(canceler)
log.Infof("Resource '%v' finally released. Lock acquired", resource.key)
if err != nil {
log.Infof("Input for resource '%v' has been stopped while waiting", resource.key)
return err
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/filestream/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (p *fileProspector) onFSEvent(
group.Start(ctx, src)

case loginp.OpTruncate:
log.Debugf("File %s has been truncated", event.NewPath)
log.Debugf("File %s has been truncated setting offset to 0", event.NewPath)

err := updater.ResetCursor(src, state{Offset: 0})
if err != nil {
Expand Down
Loading