Skip to content

Commit

Permalink
Fix/change the initialization of management layer (#30694) (#30807)
Browse files Browse the repository at this point in the history
* Ensure that libbeat manager is instantiated after the hooks.

This fix an issues on Filebeat that make the start sequence of filebeat
non-deterministic. It was possible that not all the hooks were
configured correctly before the managed was receiving a configuration
from the Elastic Agent.

This causes an inconsistency between the expected configuration state
and the actual running states, this includes the following symptoms:

- Having Filebeat runnings and not sending any data to Elasticsearch
- Having Filebeat partially configured, when only some inputs were
  sending data.
- Missing log from the Filebeat collector
- Having only metricsbeats running and sending logs.

This solves the issues by moving the `Start` and stop `Stop` of the
managed into the beats initialization process, each beats need to be
adjusted to support. This is indeed a breaking changes for beats author,
but the bootstrap process of beats and libbeat cannot easily be
extended to make the change into a unique place.

(cherry picked from commit 4c14f03)

Co-authored-by: Pier-Hugues Pellerin <phpellerin@gmail.com>
  • Loading branch information
mergify[bot] and ph committed Mar 16, 2022
1 parent 202e33d commit a35b29c
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 24 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif
- Fixes Beats crashing when glibc >= 2.35 is used {issue}30576[30576]
- Log errors when parsing and applying config blocks and if the input is disabled. {pull}30534[30534]
- Wildcard fields no longer have a default ignore_above setting of 1024. {issue}30096[30096] {pull}30668[30668]
- Ensure that the Reloadable part of beats are initialized before the Manager is started. {issue}30533[30533]
- Ignore bugfix version when running version compatibility check against Elasticsearch. {pull}30746[30746]

*Auditbeat*
Expand Down
8 changes: 8 additions & 0 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
}
adiscover.Start()

// We start the manager when all the subsystem are initialized and ready to received events.
if err := b.Manager.Start(); err != nil {
return err
}

// Add done channel to wait for shutdown signal
waitFinished.AddChan(fb.done)
waitFinished.Wait()
Expand Down Expand Up @@ -411,6 +416,9 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
}
}

// Stop the manager and stop the connection to any dependent services.
b.Manager.Stop()

return nil
}

Expand Down
6 changes: 6 additions & 0 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {
return err
}
}
// Configure the beats Manager to start after all the reloadable hooks are initialized
// and shutdown when the function return.
if err := b.Manager.Start(); err != nil {
return err
}
defer b.Manager.Stop()

if bt.config.Autodiscover != nil {
bt.autodiscover, err = bt.makeAutodiscover(b)
Expand Down
5 changes: 2 additions & 3 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,9 +488,8 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {

logp.Info("%s start running.", b.Info.Beat)

// Launch config manager
b.Manager.Start(beater.Stop)
defer b.Manager.Stop()
// Allow the manager to stop a currently running beats out of bound.
b.Manager.SetStopCallback(beater.Stop)

return beater.Run(&b.Beat)
}
Expand Down
40 changes: 29 additions & 11 deletions libbeat/management/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,38 @@ type Manager interface {
// Enabled returns true if manager is enabled.
Enabled() bool

// Start the config manager giving it a stopFunc callback
// so the beat can be told when to stop.
Start(stopFunc func())

// Stop the config manager.
// Start needs to invoked when the system is ready to receive an external configuration and
// also ready to start ingesting new events. The manager expects that all the reloadable and
// reloadable list are fixed for the whole lifetime of the manager.
//
// Notes: Adding dynamically new reloadable hooks at runtime can lead to inconsistency in the
// execution.
Start() error

// Stop when this method is called, the manager will stop receiving new actions, no more action
// will be propagated to the handlers and will not try to configure any reloadable parts.
// When the manager is stop the callback will be called to signal that the system can terminate.
//
// Calls to 'CheckRawConfig()' or 'SetPayload()' will be ignored after calling stop.
//
// Note: Stop will not call 'UnregisterAction()' automaticallty.
Stop()

// SetStopCallback accepts a function that need to be called when the manager want to shutdown the
// beats. This is needed when you want your beats to be gracefully shutdown remotely by the Elastic Agent
// when a policy doesn't need to run this beat.
SetStopCallback(f func())

// CheckRawConfig check settings are correct before launching the beat.
CheckRawConfig(cfg *common.Config) error

// RegisterAction registers action handler with the client
RegisterAction(action client.Action)

// UnregisterAction unregisters action handler with the client
UnregisterAction(action client.Action)

// SetPayload sets the client payload
// SetPayload Allows to add additional metadata to future requests made by the manager.
SetPayload(map[string]interface{})
}

Expand Down Expand Up @@ -136,10 +152,11 @@ func defaultModeConfig() *modeConfig {

// nilManager, fallback when no manager is present
type nilManager struct {
logger *logp.Logger
lock sync.Mutex
status Status
msg string
logger *logp.Logger
lock sync.Mutex
status Status
msg string
stopFunc func()
}

func nilFactory(*common.Config, *reload.Registry, uuid.UUID) (Manager, error) {
Expand All @@ -151,8 +168,9 @@ func nilFactory(*common.Config, *reload.Registry, uuid.UUID) (Manager, error) {
}, nil
}

func (*nilManager) SetStopCallback(func()) {}
func (*nilManager) Enabled() bool { return false }
func (*nilManager) Start(_ func()) {}
func (*nilManager) Start() error { return nil }
func (*nilManager) Stop() {}
func (*nilManager) CheckRawConfig(cfg *common.Config) error { return nil }
func (n *nilManager) UpdateStatus(status Status, msg string) {
Expand Down
8 changes: 8 additions & 0 deletions metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,13 @@ func (bt *Metricbeat) Run(b *beat.Beat) error {
modules.Stop()
}()

// Start the manager after all the reload hooks are configured,
// the Manager is stopped at the end of the execution.
if err := b.Manager.Start(); err != nil {
return err
}
defer b.Manager.Stop()

// Dynamic file based modules (metricbeat.config.modules)
if bt.config.ConfigModules.Enabled() {
moduleReloader := cfgfile.NewReloader(b.Publisher, bt.config.ConfigModules)
Expand Down Expand Up @@ -256,6 +263,7 @@ func (bt *Metricbeat) Run(b *beat.Beat) error {
}

wg.Wait()

return nil
}

Expand Down
13 changes: 11 additions & 2 deletions packetbeat/beater/packetbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,19 @@ func (pb *packetbeat) runStatic(b *beat.Beat, factory *processorFactory) error {
func (pb *packetbeat) runManaged(b *beat.Beat, factory *processorFactory) error {
runner := newReloader(management.DebugK, factory, b.Publisher)
reload.Register.MustRegisterList("inputs", runner)
defer runner.Stop()

logp.Debug("main", "Waiting for the runner to finish")

// Start the manager after all the hooks are registered and terminates when
// the function return.
if err := b.Manager.Start(); err != nil {
return err
}

defer func() {
runner.Stop()
b.Manager.Stop()
}()

for {
select {
case <-pb.done:
Expand Down
30 changes: 22 additions & 8 deletions x-pack/libbeat/management/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,27 +96,36 @@ func (cm *Manager) Enabled() bool {
return cm.config.Enabled
}

// Start the config manager
func (cm *Manager) Start(stopFunc func()) {
if !cm.Enabled() {
return
}
// SetStopCallback sets the callback to run when the manager want to shutdown the beats gracefully.
func (cm *Manager) SetStopCallback(stopFunc func()) {
cm.lock.Lock()
defer cm.lock.Unlock()
cm.stopFunc = stopFunc
}

// Start the config manager.
func (cm *Manager) Start() error {
cm.lock.Lock()
defer cm.lock.Unlock()

if !cm.Enabled() {
return nil
}

cfgwarn.Beta("Fleet management is enabled")
cm.logger.Info("Starting fleet management service")

cm.stopFunc = stopFunc
cm.isRunning = true
err := cm.client.Start(context.Background())
if err != nil {
cm.logger.Errorf("failed to start elastic-agent-client: %s", err)
return err
}
cm.logger.Info("Ready to receive configuration")
return nil
}

// Stop the config manager
// Stop stops the current Manager and close the connection to Elastic Agent.
func (cm *Manager) Stop() {
cm.lock.Lock()
defer cm.lock.Unlock()
Expand All @@ -133,6 +142,8 @@ func (cm *Manager) Stop() {
// CheckRawConfig check settings are correct to start the beat. This method
// checks there are no collision between the existing configuration and what
// fleet management can configure.
//
// NOTE: This is currently not implemented for fleet.
func (cm *Manager) CheckRawConfig(cfg *common.Config) error {
// TODO implement this method
return nil
Expand Down Expand Up @@ -217,6 +228,9 @@ func (cm *Manager) SetPayload(payload map[string]interface{}) {
}

func (cm *Manager) OnStop() {
cm.lock.Lock()
defer cm.lock.Unlock()

if cm.stopFunc != nil {
cm.client.Status(proto.StateObserved_STOPPING, "Stopping", nil)
cm.stopFunc()
Expand Down Expand Up @@ -320,7 +334,7 @@ func (cm *Manager) toConfigBlocks(cfg common.MapStr) (ConfigBlocks, error) {
for _, regName := range cm.registry.GetRegisteredNames() {
iBlock, err := cfg.GetValue(regName)
if err != nil {
cm.logger.Errorf("failed to get '%s' from config: %v. Continuing to next one", regName, err)
cm.logger.Warnf("failed to get '%s' from config: %v. Continuing to next one", regName, err)
continue
}

Expand Down
7 changes: 7 additions & 0 deletions x-pack/osquerybeat/beater/osquerybeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ func (bt *osquerybeat) Run(b *beat.Beat) error {
runner.Update(ctx, bt.config.Inputs)
}

// Ensure that all the hooks and actions are ready before starting the Manager
// to receive configuration.
if err := b.Manager.Start(); err != nil {
return err
}
defer b.Manager.Stop()

// Set the osquery beat version to the manager payload. This allows the bundled osquery version to be reported to the stack.
bt.setManagerPayload(b)

Expand Down

0 comments on commit a35b29c

Please sign in to comment.