Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply Autodiscovery dynamic fields in autoreloading #19135

Merged
merged 8 commits into from
Jun 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- The Elasticsearch client settings expect the API key to be raw (not base64-encoded). {issue}18939[18939] {pull}18945[18945]
- `management.ConfigManager` has been renamed to `management.Manager`. {pull}19114[19114]
- `UpdateStatus` has been added to the `management.Manager` interface. {pull}19114[19114]
- Remove `common.MapStrPointer` parameter from `cfgfile.Runnerfactory` interface. {pull}19135[19135]

==== Bugfixes

Expand Down
2 changes: 1 addition & 1 deletion filebeat/beater/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (c *crawler) startInput(
return fmt.Errorf("input with same ID already exists: %v", id)
}

runner, err := c.inputsFactory.Create(pipeline, config, nil)
runner, err := c.inputsFactory.Create(pipeline, config)
if err != nil {
return fmt.Errorf("Error while initializing input: %+v", err)
}
Expand Down
13 changes: 4 additions & 9 deletions filebeat/channel/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type onCreateFactory struct {
create onCreateWrapper
}

type onCreateWrapper func(cfgfile.RunnerFactory, beat.PipelineConnector, *common.Config, *common.MapStrPointer) (cfgfile.Runner, error)
type onCreateWrapper func(cfgfile.RunnerFactory, beat.PipelineConnector, *common.Config) (cfgfile.Runner, error)

// commonInputConfig defines common input settings
// for the publisher pipeline.
Expand Down Expand Up @@ -63,12 +63,8 @@ func (f *onCreateFactory) CheckConfig(cfg *common.Config) error {
return f.factory.CheckConfig(cfg)
}

func (f *onCreateFactory) Create(
pipeline beat.PipelineConnector,
cfg *common.Config,
meta *common.MapStrPointer,
) (cfgfile.Runner, error) {
return f.create(f.factory, pipeline, cfg, meta)
func (f *onCreateFactory) Create(pipeline beat.PipelineConnector, cfg *common.Config) (cfgfile.Runner, error) {
return f.create(f.factory, pipeline, cfg)
}

// RunnerFactoryWithCommonInputSettings wraps a runner factory, such that all runners
Expand All @@ -93,14 +89,13 @@ func RunnerFactoryWithCommonInputSettings(info beat.Info, f cfgfile.RunnerFactor
f cfgfile.RunnerFactory,
pipeline beat.PipelineConnector,
cfg *common.Config,
meta *common.MapStrPointer,
) (runner cfgfile.Runner, err error) {
pipeline, err = withClientConfig(info, pipeline, cfg)
if err != nil {
return nil, err
}

return f.Create(pipeline, cfg, meta)
return f.Create(pipeline, cfg)
})
}

Expand Down
6 changes: 3 additions & 3 deletions filebeat/fileset/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewFactory(
}

// Create creates a module based on a config
func (f *Factory) Create(p beat.PipelineConnector, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
func (f *Factory) Create(p beat.PipelineConnector, c *common.Config) (cfgfile.Runner, error) {
// Start a registry of one module:
m, err := NewModuleRegistry([]*common.Config{c}, f.beatInfo, false)
if err != nil {
Expand All @@ -98,7 +98,7 @@ func (f *Factory) Create(p beat.PipelineConnector, c *common.Config, meta *commo

inputs := make([]cfgfile.Runner, len(pConfigs))
for i, pConfig := range pConfigs {
inputs[i], err = f.inputFactory.Create(p, pConfig, meta)
inputs[i], err = f.inputFactory.Create(p, pConfig)
if err != nil {
logp.Err("Error creating input: %s", err)
return nil, err
Expand All @@ -116,7 +116,7 @@ func (f *Factory) Create(p beat.PipelineConnector, c *common.Config, meta *commo
}

func (f *Factory) CheckConfig(c *common.Config) error {
_, err := f.Create(pubpipeline.NewNilPipeline(), c, nil)
_, err := f.Create(pubpipeline.NewNilPipeline(), c)
return err
}

Expand Down
4 changes: 2 additions & 2 deletions filebeat/fileset/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewSetupFactory(beatInfo beat.Info, pipelineLoaderFactory PipelineLoaderFac
}

// Create creates a new SetupCfgRunner to setup module configuration.
func (sf *SetupFactory) Create(_ beat.PipelineConnector, c *common.Config, _ *common.MapStrPointer) (cfgfile.Runner, error) {
func (sf *SetupFactory) Create(_ beat.PipelineConnector, c *common.Config) (cfgfile.Runner, error) {
m, err := NewModuleRegistry([]*common.Config{c}, sf.beatInfo, false)
if err != nil {
return nil, err
Expand All @@ -56,7 +56,7 @@ func (sf *SetupFactory) Create(_ beat.PipelineConnector, c *common.Config, _ *co
}

func (sf *SetupFactory) CheckConfig(c *common.Config) error {
_, err := sf.Create(pubpipeline.NewNilPipeline(), c, nil)
_, err := sf.Create(pubpipeline.NewNilPipeline(), c)
return err
}

Expand Down
10 changes: 4 additions & 6 deletions filebeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ func New(
connector channel.Connector,
beatDone chan struct{},
states []file.State,
dynFields *common.MapStrPointer,
) (*Runner, error) {
input := &Runner{
config: defaultConfig,
Expand All @@ -82,11 +81,10 @@ func New(
}

context := Context{
States: states,
Done: input.done,
BeatDone: input.beatDone,
DynamicFields: dynFields,
Meta: nil,
States: states,
Done: input.done,
BeatDone: input.beatDone,
Meta: nil,
}
var ipt Input
ipt, err = f(conf, connector, context)
Expand Down
3 changes: 0 additions & 3 deletions filebeat/input/kafka/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ func NewInput(
}

out, err := connector.ConnectWith(cfg, beat.ClientConfig{
Processing: beat.ProcessingConfig{
DynamicFields: inputContext.DynamicFields,
},
ACKEvents: func(events []interface{}) {
for _, event := range events {
if meta, ok := event.(eventMeta); ok {
Expand Down
6 changes: 1 addition & 5 deletions filebeat/input/log/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,7 @@ func NewInput(
// The outlet generated here is the underlying outlet, only closed
// once all workers have been shut down.
// For state updates and events, separate sub-outlets will be used.
out, err := outlet.ConnectWith(cfg, beat.ClientConfig{
Processing: beat.ProcessingConfig{
DynamicFields: context.DynamicFields,
},
})
out, err := outlet.Connect(cfg)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions filebeat/input/mqtt/client_mocked.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ type mockedConnector struct {

var _ channel.Connector = new(mockedConnector)

func (m *mockedConnector) Connect(*common.Config) (channel.Outleter, error) {
panic("implement me")
func (m *mockedConnector) Connect(c *common.Config) (channel.Outleter, error) {
return m.ConnectWith(c, beat.ClientConfig{})
}

func (m *mockedConnector) ConnectWith(*common.Config, beat.ClientConfig) (channel.Outleter, error) {
Expand Down
6 changes: 1 addition & 5 deletions filebeat/input/mqtt/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,7 @@ func newInput(
return nil, errors.Wrap(err, "reading mqtt input config")
}

out, err := connector.ConnectWith(cfg, beat.ClientConfig{
Processing: beat.ProcessingConfig{
DynamicFields: inputContext.DynamicFields,
},
})
out, err := connector.Connect(cfg)
if err != nil {
return nil, err
}
Expand Down
7 changes: 1 addition & 6 deletions filebeat/input/redis/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/elastic/beats/v7/filebeat/harvester"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/filebeat/input/file"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/logp"
Expand Down Expand Up @@ -59,11 +58,7 @@ func NewInput(cfg *common.Config, connector channel.Connector, context input.Con
return nil, err
}

out, err := connector.ConnectWith(cfg, beat.ClientConfig{
Processing: beat.ProcessingConfig{
DynamicFields: context.DynamicFields,
},
})
out, err := connector.Connect(cfg)
if err != nil {
return nil, err
}
Expand Down
9 changes: 4 additions & 5 deletions filebeat/input/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@ import (
)

type Context struct {
States []file.State
Done chan struct{}
BeatDone chan struct{}
DynamicFields *common.MapStrPointer
Meta map[string]string
States []file.State
Done chan struct{}
BeatDone chan struct{}
Meta map[string]string
}

// Factory is used to register functions creating new Input instances.
Expand Down
5 changes: 2 additions & 3 deletions filebeat/input/runnerfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@ func NewRunnerFactory(outlet channel.Factory, registrar *registrar.Registrar, be
func (r *RunnerFactory) Create(
pipeline beat.PipelineConnector,
c *common.Config,
meta *common.MapStrPointer,
) (cfgfile.Runner, error) {
connector := r.outlet(pipeline)
p, err := New(c, connector, r.beatDone, r.registrar.GetStates(), meta)
p, err := New(c, connector, r.beatDone, r.registrar.GetStates())
if err != nil {
// In case of error with loading state, input is still returned
return p, err
Expand All @@ -59,6 +58,6 @@ func (r *RunnerFactory) Create(
}

func (r *RunnerFactory) CheckConfig(cfg *common.Config) error {
_, err := r.Create(pipeline.NewNilPipeline(), cfg, nil)
_, err := r.Create(pipeline.NewNilPipeline(), cfg)
return err
}
7 changes: 1 addition & 6 deletions filebeat/input/stdin/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/filebeat/input/file"
"github.com/elastic/beats/v7/filebeat/input/log"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)
Expand All @@ -49,11 +48,7 @@ type Input struct {
// NewInput creates a new stdin input
// This input contains one harvester which is reading from stdin
func NewInput(cfg *common.Config, outlet channel.Connector, context input.Context) (input.Input, error) {
out, err := outlet.ConnectWith(cfg, beat.ClientConfig{
Processing: beat.ProcessingConfig{
DynamicFields: context.DynamicFields,
},
})
out, err := outlet.Connect(cfg)
if err != nil {
return nil, err
}
Expand Down
6 changes: 1 addition & 5 deletions filebeat/input/syslog/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,7 @@ func NewInput(

log := logp.NewLogger("syslog")

out, err := outlet.ConnectWith(cfg, beat.ClientConfig{
Processing: beat.ProcessingConfig{
DynamicFields: context.DynamicFields,
},
})
out, err := outlet.Connect(cfg)
if err != nil {
return nil, err
}
Expand Down
6 changes: 1 addition & 5 deletions filebeat/input/tcp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,7 @@ func NewInput(
context input.Context,
) (input.Input, error) {

out, err := connector.ConnectWith(cfg, beat.ClientConfig{
Processing: beat.ProcessingConfig{
DynamicFields: context.DynamicFields,
},
})
out, err := connector.Connect(cfg)
if err != nil {
return nil, err
}
Expand Down
6 changes: 1 addition & 5 deletions filebeat/input/udp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,7 @@ func NewInput(
context input.Context,
) (input.Input, error) {

out, err := outlet.ConnectWith(cfg, beat.ClientConfig{
Processing: beat.ProcessingConfig{
DynamicFields: context.DynamicFields,
},
})
out, err := outlet.Connect(cfg)
if err != nil {
return nil, err
}
Expand Down
6 changes: 1 addition & 5 deletions filebeat/input/unix/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,7 @@ func NewInput(
) (input.Input, error) {
cfgwarn.Beta("Unix socket support is beta.")

out, err := connector.ConnectWith(cfg, beat.ClientConfig{
Processing: beat.ProcessingConfig{
DynamicFields: context.DynamicFields,
},
})
out, err := connector.Connect(cfg)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) error {
factory := monitors.NewFactory(bt.scheduler, true)

for _, cfg := range bt.config.Monitors {
created, err := factory.Create(b.Publisher, cfg, nil)
created, err := factory.Create(b.Publisher, cfg)
if err != nil {
return errors.Wrap(err, "could not create monitor")
}
Expand Down
4 changes: 2 additions & 2 deletions heartbeat/monitors/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func NewFactory(sched *scheduler.Scheduler, allowWatches bool) *RunnerFactory {
}

// Create makes a new Runner for a new monitor with the given Config.
func (f *RunnerFactory) Create(p beat.PipelineConnector, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
monitor, err := newMonitor(c, globalPluginsReg, p, f.sched, f.allowWatches, meta)
func (f *RunnerFactory) Create(p beat.PipelineConnector, c *common.Config) (cfgfile.Runner, error) {
monitor, err := newMonitor(c, globalPluginsReg, p, f.sched, f.allowWatches)
return monitor, err
}

Expand Down
10 changes: 3 additions & 7 deletions heartbeat/monitors/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ type Monitor struct {

// stats is the countersRecorder used to record lifecycle events
// for global metrics + telemetry
stats registryRecorder
factoryMetadata *common.MapStrPointer
stats registryRecorder
}

// String prints a description of the monitor in a threadsafe way. It is important that this use threadsafe
Expand All @@ -73,7 +72,7 @@ func (m *Monitor) String() string {
}

func checkMonitorConfig(config *common.Config, registrar *pluginsReg, allowWatches bool) error {
m, err := newMonitor(config, registrar, nil, nil, allowWatches, nil)
m, err := newMonitor(config, registrar, nil, nil, allowWatches)
if m != nil {
m.Stop() // Stop the monitor to free up the ID from uniqueness checks
}
Expand Down Expand Up @@ -101,9 +100,8 @@ func newMonitor(
pipelineConnector beat.PipelineConnector,
scheduler *scheduler.Scheduler,
allowWatches bool,
factoryMetadata *common.MapStrPointer,
) (*Monitor, error) {
m, err := newMonitorUnsafe(config, registrar, pipelineConnector, scheduler, allowWatches, factoryMetadata)
m, err := newMonitorUnsafe(config, registrar, pipelineConnector, scheduler, allowWatches)
if m != nil && err != nil {
m.Stop()
}
Expand All @@ -118,7 +116,6 @@ func newMonitorUnsafe(
pipelineConnector beat.PipelineConnector,
scheduler *scheduler.Scheduler,
allowWatches bool,
factoryMetadata *common.MapStrPointer,
) (*Monitor, error) {
// Extract just the Id, Type, and Enabled fields from the config
// We'll parse things more precisely later once we know what exact type of
Expand All @@ -145,7 +142,6 @@ func newMonitorUnsafe(
internalsMtx: sync.Mutex{},
config: config,
stats: monitorPlugin.stats,
factoryMetadata: factoryMetadata,
}

if m.id != "" {
Expand Down
Loading