From 8efb36685ea9213a7f0f9ad583698cdf85394d3e Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Tue, 9 Apr 2024 07:54:04 +0200 Subject: [PATCH] Fix Filestream store GC, entries are now removed when they TTL expires (#38488) The resources from Filestream registry have a counter to indicate how many 'owners' have got a hold of that resource, this counter was not correctly decremented. Because it never reached zero, no entry was ever removed from the in-memory store and even though the store GC would run periodically, no resource could be removed. That caused the in-memory store to be ever growing and the `op: remove` never to be seen in the registry log file. This commit fixes this bug by correctly calling Released in every resource that is retained. --- CHANGELOG.next.asciidoc | 1 + .../internal/input-logfile/clean.go | 4 + .../internal/input-logfile/store.go | 2 + filebeat/tests/integration/store_test.go | 145 ++++++++++++++++++ libbeat/tests/integration/framework.go | 39 ++++- 5 files changed, 189 insertions(+), 2 deletions(-) create mode 100644 filebeat/tests/integration/store_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 377c3c9f133..3b95d21f759 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -98,6 +98,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Rename `activity_guid` to `activity_id` in ETW input events to suit other Windows inputs. {pull}38530[38530] - Add missing provider registration and fix published entity for Active Directory entityanalytics provider. {pull}38645[38645] - Fix handling of un-parsed JSON in O365 module. {issue}37800[37800] {pull}38709[38709] +- Fix filestream's registry GC: registry entries are now removed from the in-memory and disk store when they're older than the set TTL {issue}36761[36761] {pull}38488[38488] *Heartbeat* diff --git a/filebeat/input/filestream/internal/input-logfile/clean.go b/filebeat/input/filestream/internal/input-logfile/clean.go index a96f5529895..c87997a0afa 100644 --- a/filebeat/input/filestream/internal/input-logfile/clean.go +++ b/filebeat/input/filestream/internal/input-logfile/clean.go @@ -79,6 +79,10 @@ func gcStore(log *logp.Logger, started time.Time, store *store) { if err := gcClean(store, keys); err != nil { log.Errorf("Failed to remove all entries from the registry: %+v", err) } + + // The main reason for this log entry is to enable tests that want to observe + // if the resources are correctly removed from the store. + log.Debugf("%d entries removed", len(keys)) } // gcFind searches the store of resources that can be removed. A set of keys to delete is returned. diff --git a/filebeat/input/filestream/internal/input-logfile/store.go b/filebeat/input/filestream/internal/input-logfile/store.go index 2b5b272fbcc..024ca5c9bfd 100644 --- a/filebeat/input/filestream/internal/input-logfile/store.go +++ b/filebeat/input/filestream/internal/input-logfile/store.go @@ -335,6 +335,7 @@ func (s *store) updateMetadata(key string, meta interface{}) error { resource.cursorMeta = meta s.writeState(resource) + resource.Release() return nil } @@ -384,6 +385,7 @@ func (s *store) remove(key string) error { return fmt.Errorf("resource '%s' not found", key) } s.UpdateTTL(resource, 0) + resource.Release() return nil } diff --git a/filebeat/tests/integration/store_test.go b/filebeat/tests/integration/store_test.go new file mode 100644 index 00000000000..c3ddb1ca759 --- /dev/null +++ b/filebeat/tests/integration/store_test.go @@ -0,0 +1,145 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build integration + +package integration + +import ( + "bufio" + "encoding/json" + "fmt" + "os" + "path" + "path/filepath" + "testing" + "time" + + "github.com/elastic/beats/v7/libbeat/tests/integration" +) + +var testStoreCfg = ` +filebeat.inputs: + - type: filestream + id: test-clean-removed + enabled: true + clean_removed: true + close.on_state_change.inactive: 8s + ignore_older: 9s + prospector.scanner.check_interval: 1s + paths: + - %s + +filebeat.registry: + cleanup_interval: 5s + flush: 1s + +queue.mem: + flush.min_events: 8 + flush.timeout: 0.1s + +path.home: %s + +output.file: + path: ${path.home} + filename: "output-file" + rotate_every_kb: 10000 + +logging: + level: debug + selectors: + - input + - input.filestream +` + +func TestStore(t *testing.T) { + numLogFiles := 10 + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + tempDir := filebeat.TempDir() + + // 1. Create some log files and write data to them + logsFolder := filepath.Join(tempDir, "logs") + if err := os.MkdirAll(logsFolder, 0755); err != nil { + t.Fatalf("could not create logs folder '%s': %s", logsFolder, err) + } + + for i := 0; i < numLogFiles; i++ { + logFile := path.Join(logsFolder, fmt.Sprintf("log-%d.log", i)) + integration.GenerateLogFile(t, logFile, 10, false) + } + logsFolderGlob := filepath.Join(logsFolder, "*") + filebeat.WriteConfigFile(fmt.Sprintf(testStoreCfg, logsFolderGlob, tempDir)) + + // 2. Ingest the file and stop Filebeat + filebeat.Start() + + for i := 0; i < numLogFiles; i++ { + // Files can be ingested out of order, so we cannot specify their path. + // There will be more than one log line per file, but that at least gives us + // some assurance the files were read + filebeat.WaitForLogs("Closing reader of filestream", 30*time.Second, "Filebeat did not finish reading the log file") + } + + // 3. Remove files so their state can be cleaned + if err := os.RemoveAll(logsFolder); err != nil { + t.Fatalf("could not remove logs folder '%s': %s", logsFolder, err) + } + filebeat.WaitForLogs(fmt.Sprintf("%d entries removed", numLogFiles), 30*time.Second, "store entries not removed") + filebeat.Stop() + + registryLogFile := filepath.Join(tempDir, "data/registry/filebeat/log.json") + readFilestreamRegistryLog(t, registryLogFile, "remove", 10) +} + +func readFilestreamRegistryLog(t *testing.T, path, op string, expectedCount int) { + file, err := os.Open(path) + if err != nil { + t.Fatalf("could not open file '%s': %s", path, err) + } + + s := bufio.NewScanner(file) + count := 0 + for s.Scan() { + line := s.Bytes() + + registryOp := struct { + Op string `json:"op"` + ID int `json:"id"` + }{} + + if err := json.Unmarshal(line, ®istryOp); err != nil { + t.Fatalf("could not read line '%s': %s", string(line), err) + } + + // Skips registry log entries that are not operation count + if registryOp.Op == "" { + continue + } + + if registryOp.Op == op { + count++ + } + } + + if count != expectedCount { + t.Errorf("expecting %d '%s' operations, got %d instead", expectedCount, op, count) + } +} diff --git a/libbeat/tests/integration/framework.go b/libbeat/tests/integration/framework.go index 9657fbaeaff..6017c63fd87 100644 --- a/libbeat/tests/integration/framework.go +++ b/libbeat/tests/integration/framework.go @@ -26,7 +26,6 @@ import ( "errors" "fmt" "io" - "io/ioutil" "net/http" "net/url" "os" @@ -115,6 +114,7 @@ func NewBeat(t *testing.T, beatName, binary string, args ...string) *BeatProc { "--path.logs", tempDir, "-E", "logging.to_files=true", "-E", "logging.files.rotateeverybytes=104857600", // About 100MB + "-E", "logging.files.rotateonstartup=false", }, args...), tempDir: tempDir, beatName: beatName, @@ -524,7 +524,7 @@ func (b *BeatProc) LoadMeta() (Meta, error) { } defer metaFile.Close() - metaBytes, err := ioutil.ReadAll(metaFile) + metaBytes, err := io.ReadAll(metaFile) require.NoError(b.t, err, "error reading meta file") err = json.Unmarshal(metaBytes, &m) require.NoError(b.t, err, "error unmarshalling meta data") @@ -685,3 +685,38 @@ func readLastNBytes(filename string, numBytes int64) ([]byte, error) { } return io.ReadAll(f) } + +// GenerateLogFile writes count lines to path, each line is 50 bytes. +// Each line contains the current time (RFC3339) and a counter +func GenerateLogFile(t *testing.T, path string, count int, append bool) { + var file *os.File + var err error + if !append { + file, err = os.Create(path) + if err != nil { + t.Fatalf("could not create file '%s': %s", path, err) + } + } else { + file, err = os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666) + if err != nil { + t.Fatalf("could not open or create file: '%s': %s", path, err) + } + } + + defer func() { + if err := file.Close(); err != nil { + t.Fatalf("could not close file: %s", err) + } + }() + defer func() { + if err := file.Sync(); err != nil { + t.Fatalf("could not sync file: %s", err) + } + }() + now := time.Now().Format(time.RFC3339) + for i := 0; i < count; i++ { + if _, err := fmt.Fprintf(file, "%s %13d\n", now, i); err != nil { + t.Fatalf("could not write line %d to file: %s", count+1, err) + } + } +}