Skip to content

Commit f55d831

Browse files
mergify[bot]leehinmanmauri870
authored
[8.19](backport #46779) [fbreceiver] partial fix for global paths (#47829)
* [fbreceiver] partial fix for global paths (#46779) partial fix of global paths for filebeat receiver (cherry picked from commit e55e0be) # Conflicts: # filebeat/beater/crawler.go # libbeat/cfgfile/reload_test.go # libbeat/cmd/instance/beat.go # libbeat/otelbeat/oteltest/oteltest.go # x-pack/filebeat/fbreceiver/receiver_test.go * fix conflicts --------- Co-authored-by: Lee E Hinman <57081003+leehinman@users.noreply.github.com> Co-authored-by: Mauri de Souza Meneguzzo <mauri870@gmail.com>
1 parent 78f0216 commit f55d831

File tree

34 files changed

+433
-270
lines changed

34 files changed

+433
-270
lines changed

filebeat/autodiscover/builder/hints/logs.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
conf "github.com/elastic/elastic-agent-libs/config"
3535
"github.com/elastic/elastic-agent-libs/logp"
3636
"github.com/elastic/elastic-agent-libs/mapstr"
37+
"github.com/elastic/elastic-agent-libs/paths"
3738
)
3839

3940
const (
@@ -68,7 +69,7 @@ func NewLogHints(cfg *conf.C, logger *logp.Logger) (autodiscover.Builder, error)
6869
return nil, fmt.Errorf("unable to unpack hints config due to error: %w", err)
6970
}
7071

71-
moduleRegistry, err := fileset.NewModuleRegistry(nil, beat.Info{Logger: logger}, false, fileset.FilesetOverrides{})
72+
moduleRegistry, err := fileset.NewModuleRegistry(nil, beat.Info{Logger: logger}, false, fileset.FilesetOverrides{}, paths.Paths)
7273
if err != nil {
7374
return nil, err
7475
}
@@ -155,7 +156,6 @@ func (l *logHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*conf
155156
} else {
156157
shouldPut(tempCfg, json, jsonOpts, l.log)
157158
}
158-
159159
}
160160
// Merge config template with the configs from the annotations
161161
// AppendValues option is used to append arrays from annotations to existing arrays while merging

filebeat/beater/crawler.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/elastic/beats/v7/libbeat/cfgfile"
2929
conf "github.com/elastic/elastic-agent-libs/config"
3030
"github.com/elastic/elastic-agent-libs/logp"
31+
"github.com/elastic/elastic-agent-libs/paths"
3132
)
3233

3334
type crawler struct {
@@ -41,6 +42,7 @@ type crawler struct {
4142
inputReloader *cfgfile.Reloader
4243
once bool
4344
beatDone chan struct{}
45+
beatPaths *paths.Path
4446
}
4547

4648
func newCrawler(
@@ -49,6 +51,7 @@ func newCrawler(
4951
beatDone chan struct{},
5052
once bool,
5153
logger *logp.Logger,
54+
beatPaths *paths.Path,
5255
) (*crawler, error) {
5356
return &crawler{
5457
log: logger.Named("crawler"),
@@ -58,6 +61,7 @@ func newCrawler(
5861
inputConfigs: inputConfigs,
5962
once: once,
6063
beatDone: beatDone,
64+
beatPaths: beatPaths,
6165
}, nil
6266
}
6367

@@ -80,14 +84,14 @@ func (c *crawler) Start(
8084
}
8185

8286
if configInputs.Enabled() {
83-
c.inputReloader = cfgfile.NewReloader(log, pipeline, configInputs)
87+
c.inputReloader = cfgfile.NewReloader(log.Named("input.reloader"), pipeline, configInputs, c.beatPaths)
8488
if err := c.inputReloader.Check(c.inputsFactory); err != nil {
8589
return fmt.Errorf("creating input reloader failed: %w", err)
8690
}
8791
}
8892

8993
if configModules.Enabled() {
90-
c.modulesReloader = cfgfile.NewReloader(log, pipeline, configModules)
94+
c.modulesReloader = cfgfile.NewReloader(log.Named("module.reloader"), pipeline, configModules, c.beatPaths)
9195
if err := c.modulesReloader.Check(c.modulesFactory); err != nil {
9296
return fmt.Errorf("creating module reloader failed: %w", err)
9397
}
@@ -113,7 +117,6 @@ func (c *crawler) startInput(
113117
pipeline beat.PipelineConnector,
114118
config *conf.C,
115119
) error {
116-
117120
if !config.Enabled() {
118121
c.log.Infof("input disabled, skipping it")
119122
return nil

filebeat/beater/diagnostics.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,12 @@ func getRegexpsForRegistryFiles() ([]*regexp.Regexp, error) {
3939

4040
registryFileRegExps := []*regexp.Regexp{}
4141
preFilesList := [][]string{
42-
[]string{"^registry$"},
43-
[]string{"^registry", "filebeat$"},
44-
[]string{"^registry", "filebeat", "meta\\.json$"},
45-
[]string{"^registry", "filebeat", "log\\.json$"},
46-
[]string{"^registry", "filebeat", "active\\.dat$"},
47-
[]string{"^registry", "filebeat", "[[:digit:]]*\\.json$"},
42+
{"^registry$"},
43+
{"^registry", "filebeat$"},
44+
{"^registry", "filebeat", "meta\\.json$"},
45+
{"^registry", "filebeat", "log\\.json$"},
46+
{"^registry", "filebeat", "active\\.dat$"},
47+
{"^registry", "filebeat", "[[:digit:]]*\\.json$"},
4848
}
4949

5050
for _, lst := range preFilesList {
@@ -70,12 +70,12 @@ func getRegexpsForRegistryFiles() ([]*regexp.Regexp, error) {
7070
return registryFileRegExps, nil
7171
}
7272

73-
func gzipRegistry(logger *logp.Logger) func() []byte {
73+
func gzipRegistry(logger *logp.Logger, beatPaths *paths.Path) func() []byte {
7474
logger = logger.Named("diagnostics")
7575

7676
return func() []byte {
7777
buf := bytes.Buffer{}
78-
dataPath := paths.Resolve(paths.Data, "")
78+
dataPath := beatPaths.Resolve(paths.Data, "")
7979
registryPath := filepath.Join(dataPath, "registry")
8080
f, err := os.CreateTemp("", "filebeat-registry-*.tar")
8181
if err != nil {

filebeat/beater/filebeat.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea
109109
EnableAllFilesets: enableAllFilesets,
110110
ForceEnableModuleFilesets: forceEnableModuleFilesets,
111111
}
112-
moduleRegistry, err := fileset.NewModuleRegistry(config.Modules, b.Info, true, filesetOverrides)
112+
moduleRegistry, err := fileset.NewModuleRegistry(config.Modules, b.Info, true, filesetOverrides, b.Paths)
113113
if err != nil {
114114
return nil, err
115115
}
@@ -197,7 +197,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {
197197
ForceEnableModuleFilesets: forceEnableModuleFilesets,
198198
}
199199

200-
modulesFactory := fileset.NewSetupFactory(b.Info, pipelineLoaderFactory, filesetOverrides)
200+
modulesFactory := fileset.NewSetupFactory(b.Info, pipelineLoaderFactory, filesetOverrides, b.Paths)
201201
if fb.config.ConfigModules.Enabled() {
202202
if enableAllFilesets {
203203
// All module configs need to be loaded to enable all the filesets
@@ -207,7 +207,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {
207207
newPath := strings.TrimSuffix(origPath, ".yml")
208208
_ = fb.config.ConfigModules.SetString("path", -1, newPath)
209209
}
210-
modulesLoader := cfgfile.NewReloader(fb.logger.Named("module.reloader"), fb.pipeline, fb.config.ConfigModules)
210+
modulesLoader := cfgfile.NewReloader(fb.logger.Named("module.reloader"), fb.pipeline, fb.config.ConfigModules, b.Paths)
211211
modulesLoader.Load(modulesFactory)
212212
}
213213

@@ -266,7 +266,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
266266
"Filebeat's registry",
267267
"registry.tar.gz",
268268
"application/octet-stream",
269-
gzipRegistry(b.Info.Logger))
269+
gzipRegistry(b.Info.Logger, b.Paths))
270270
}
271271

272272
if !fb.moduleRegistry.Empty() {
@@ -288,7 +288,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
288288
}
289289
finishedLogger := newFinishedLogger(wgEvents)
290290

291-
registryMigrator := registrar.NewMigrator(config.Registry, fb.logger)
291+
registryMigrator := registrar.NewMigrator(config.Registry, fb.logger, b.Paths)
292292
if err := registryMigrator.Run(); err != nil {
293293
fb.logger.Errorf("Failed to migrate registry file: %+v", err)
294294
return err
@@ -301,7 +301,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
301301
cn()
302302
}()
303303

304-
stateStore, err := openStateStore(ctx, b.Info, fb.logger.Named("filebeat"), config.Registry)
304+
stateStore, err := openStateStore(ctx, b.Info, fb.logger.Named("filebeat"), config.Registry, b.Paths)
305305
if err != nil {
306306
fb.logger.Errorf("Failed to open state store: %+v", err)
307307
return err
@@ -361,7 +361,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
361361
pipelineConnector := channel.NewOutletFactory(outDone).Create
362362

363363
inputsLogger := fb.logger.Named("input")
364-
v2Inputs := fb.pluginFactory(b.Info, inputsLogger, stateStore, paths.Paths)
364+
v2Inputs := fb.pluginFactory(b.Info, inputsLogger, stateStore, b.Paths)
365365
v2InputLoader, err := v2.NewLoader(inputsLogger, v2Inputs, "type", cfg.DefaultType)
366366
if err != nil {
367367
panic(err) // loader detected invalid state.
@@ -403,8 +403,8 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
403403
fb.logger.Warn(pipelinesWarning)
404404
}
405405
}
406-
moduleLoader := fileset.NewFactory(inputLoader, b.Info, pipelineLoaderFactory, config.OverwritePipelines)
407-
crawler, err := newCrawler(inputLoader, moduleLoader, config.Inputs, fb.done, *once, fb.logger)
406+
moduleLoader := fileset.NewFactory(inputLoader, b.Info, pipelineLoaderFactory, config.OverwritePipelines, b.Paths)
407+
crawler, err := newCrawler(inputLoader, moduleLoader, config.Inputs, fb.done, *once, fb.logger, b.Paths)
408408
if err != nil {
409409
fb.logger.Errorf("Could not init crawler: %v", err)
410410
return err

filebeat/beater/store.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type filebeatStore struct {
4646
notifier *es.Notifier
4747
}
4848

49-
func openStateStore(ctx context.Context, info beat.Info, logger *logp.Logger, cfg config.Registry) (*filebeatStore, error) {
49+
func openStateStore(ctx context.Context, info beat.Info, logger *logp.Logger, cfg config.Registry, beatPaths *paths.Path) (*filebeatStore, error) {
5050
var (
5151
reg backend.Registry
5252
err error
@@ -61,7 +61,7 @@ func openStateStore(ctx context.Context, info beat.Info, logger *logp.Logger, cf
6161
}
6262

6363
reg, err = memlog.New(logger, memlog.Settings{
64-
Root: paths.Resolve(paths.Data, cfg.Path),
64+
Root: beatPaths.Resolve(paths.Data, cfg.Path),
6565
FileMode: cfg.Permissions,
6666
})
6767
if err != nil {

filebeat/fileset/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,10 @@ func NewFilesetConfig(cfg *conf.C) (*FilesetConfig, error) {
6060
// mergePathDefaults returns a copy of c containing the path variables that must
6161
// be available for variable expansion in module configuration (e.g. it enables
6262
// the use of ${path.config} in module config).
63-
func mergePathDefaults(c *conf.C) (*conf.C, error) {
63+
func mergePathDefaults(c *conf.C, beatPaths *paths.Path) (*conf.C, error) {
6464
defaults := conf.MustNewConfigFrom(map[string]interface{}{
6565
"path": map[string]interface{}{
66-
"home": paths.Paths.Home,
66+
"home": beatPaths.Home,
6767
"config": "${path.home}",
6868
"data": filepath.Join("${path.home}", "data"),
6969
"logs": filepath.Join("${path.home}", "logs"),

filebeat/fileset/factory.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,13 @@ import (
3131
conf "github.com/elastic/elastic-agent-libs/config"
3232
"github.com/elastic/elastic-agent-libs/logp"
3333
"github.com/elastic/elastic-agent-libs/monitoring"
34+
"github.com/elastic/elastic-agent-libs/paths"
3435
)
3536

36-
var moduleList = monitoring.NewUniqueList()
37-
var moduleListMetricsOnce sync.Once
37+
var (
38+
moduleList = monitoring.NewUniqueList()
39+
moduleListMetricsOnce sync.Once
40+
)
3841

3942
// RegisterMonitoringModules registers the modules list with the monitoring system.
4043
func RegisterMonitoringModules(namespace string) {
@@ -50,6 +53,7 @@ type Factory struct {
5053
overwritePipelines bool
5154
pipelineCallbackID uuid.UUID
5255
inputFactory cfgfile.RunnerFactory
56+
beatPaths *paths.Path
5357
}
5458

5559
// Wrap an array of inputs and implements cfgfile.Runner interface
@@ -69,13 +73,15 @@ func NewFactory(
6973
beatInfo beat.Info,
7074
pipelineLoaderFactory PipelineLoaderFactory,
7175
overwritePipelines bool,
76+
beatPaths *paths.Path,
7277
) *Factory {
7378
return &Factory{
7479
inputFactory: inputFactory,
7580
beatInfo: beatInfo,
7681
pipelineLoaderFactory: pipelineLoaderFactory,
7782
pipelineCallbackID: uuid.Nil,
7883
overwritePipelines: overwritePipelines,
84+
beatPaths: beatPaths,
7985
}
8086
}
8187

@@ -134,7 +140,7 @@ func (f *Factory) CheckConfig(c *conf.C) error {
134140
// createRegistry starts a registry for a set of filesets, it returns the registry and
135141
// its input configurations
136142
func (f *Factory) createRegistry(c *conf.C) (*ModuleRegistry, []*conf.C, error) {
137-
m, err := NewModuleRegistry([]*conf.C{c}, f.beatInfo, false, FilesetOverrides{})
143+
m, err := NewModuleRegistry([]*conf.C{c}, f.beatInfo, false, FilesetOverrides{}, f.beatPaths)
138144
if err != nil {
139145
return nil, nil, err
140146
}

filebeat/fileset/fileset.go

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
4141
conf "github.com/elastic/elastic-agent-libs/config"
4242
"github.com/elastic/elastic-agent-libs/logp"
43+
"github.com/elastic/elastic-agent-libs/paths"
4344
"github.com/elastic/elastic-agent-libs/version"
4445
)
4546

@@ -53,6 +54,7 @@ type Fileset struct {
5354
vars map[string]interface{}
5455
pipelineIDs []string
5556
logger *logp.Logger
57+
beatPaths *paths.Path
5658
}
5759

5860
type pipeline struct {
@@ -67,6 +69,7 @@ func New(
6769
mname string,
6870
fcfg *FilesetConfig,
6971
logger *logp.Logger,
72+
beatPaths *paths.Path,
7073
) (*Fileset, error,
7174
) {
7275
modulePath := filepath.Join(modulesPath, mname)
@@ -80,6 +83,7 @@ func New(
8083
fcfg: fcfg,
8184
modulePath: modulePath,
8285
logger: logger,
86+
beatPaths: beatPaths,
8387
}, nil
8488
}
8589

@@ -168,7 +172,7 @@ func (fs *Fileset) evaluateVars(info beat.Info) (map[string]interface{}, error)
168172
var exists bool
169173
name, exists := vals["name"].(string)
170174
if !exists {
171-
return nil, fmt.Errorf("Variable doesn't have a string 'name' key")
175+
return nil, fmt.Errorf("variable doesn't have a string 'name' key")
172176
}
173177

174178
// Variables are not required to have a default. Templates should
@@ -207,28 +211,31 @@ func (fs *Fileset) turnOffElasticsearchVars(vars map[string]interface{}, esVersi
207211
}
208212

209213
if !esVersion.IsValid() {
210-
return vars, errors.New("Unknown Elasticsearch version")
214+
return vars, errors.New("unknown Elasticsearch version")
211215
}
212216

213217
for _, vals := range fs.manifest.Vars {
214218
var ok bool
215219
name, ok := vals["name"].(string)
216220
if !ok {
217-
return nil, fmt.Errorf("Variable doesn't have a string 'name' key")
221+
return nil, fmt.Errorf("variable doesn't have a string 'name' key")
218222
}
219223

220224
minESVersion, ok := vals["min_elasticsearch_version"].(map[string]interface{})
221225
if ok {
222-
minVersion, err := version.New(minESVersion["version"].(string))
223-
if err != nil {
224-
return vars, fmt.Errorf("Error parsing version %s: %w", minESVersion["version"].(string), err)
225-
}
226+
versionString, ok := minESVersion["version"].(string)
227+
if ok {
228+
minVersion, err := version.New(versionString)
229+
if err != nil {
230+
return vars, fmt.Errorf("Error parsing version %s: %w", versionString, err)
231+
}
226232

227-
fs.logger.Named("fileset").Debugf("Comparing ES version %s with requirement of %s", esVersion.String(), minVersion)
233+
fs.logger.Named("fileset").Debugf("Comparing ES version %s with requirement of %s", esVersion.String(), minVersion)
228234

229-
if esVersion.LessThan(minVersion) {
230-
retVars[name] = minESVersion["value"]
231-
fs.logger.Infof("Setting var %s (%s) to %v because Elasticsearch version is %s", name, fs, minESVersion["value"], esVersion.String())
235+
if esVersion.LessThan(minVersion) {
236+
retVars[name] = minESVersion["value"]
237+
fs.logger.Infof("Setting var %s (%s) to %v because Elasticsearch version is %s", name, fs, minESVersion["value"], esVersion.String())
238+
}
232239
}
233240
}
234241
}
@@ -312,11 +319,11 @@ func getTemplateFunctions(vars map[string]interface{}) (template.FuncMap, error)
312319
},
313320
"IngestPipeline": func(shortID string) string {
314321
return FormatPipelineID(
315-
builtinVars["prefix"].(string),
316-
builtinVars["module"].(string),
317-
builtinVars["fileset"].(string),
322+
builtinVars["prefix"].(string), //nolint:errcheck //keep behavior for now
323+
builtinVars["module"].(string), //nolint:errcheck //keep behavior for now
324+
builtinVars["fileset"].(string), //nolint:errcheck //keep behavior for now
318325
shortID,
319-
builtinVars["beatVersion"].(string),
326+
builtinVars["beatVersion"].(string), //nolint:errcheck //keep behavior for now
320327
)
321328
},
322329
}, nil
@@ -368,7 +375,7 @@ func (fs *Fileset) getInputConfig() (*conf.C, error) {
368375
return nil, fmt.Errorf("Error reading input config: %w", err)
369376
}
370377

371-
cfg, err = mergePathDefaults(cfg)
378+
cfg, err = mergePathDefaults(cfg, fs.beatPaths)
372379
if err != nil {
373380
return nil, err
374381
}
@@ -461,15 +468,15 @@ func (fs *Fileset) GetPipelines(esVersion version.V) (pipelines []pipeline, err
461468
}
462469
newContent, err := FixYAMLMaps(content)
463470
if err != nil {
464-
return nil, fmt.Errorf("Failed to sanitize the YAML pipeline file: %s: %w", path, err)
471+
return nil, fmt.Errorf("failed to sanitize the YAML pipeline file: %s: %w", path, err)
465472
}
466473
var ok bool
467474
content, ok = newContent.(map[string]interface{})
468475
if !ok {
469476
return nil, errors.New("cannot convert newContent to map[string]interface{}")
470477
}
471478
default:
472-
return nil, fmt.Errorf("Unsupported extension '%s' for pipeline file: %s", extension, path)
479+
return nil, fmt.Errorf("unsupported extension '%s' for pipeline file: %s", extension, path)
473480
}
474481

475482
pipelineID := fs.pipelineIDs[idx]

0 commit comments

Comments
 (0)