From a35b29cfce1b882f6c266e8b347deb10095d4be2 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 16 Mar 2022 10:37:00 -0400 Subject: [PATCH] Fix/change the initialization of management layer (#30694) (#30807) * Ensure that libbeat manager is instantiated after the hooks. This fix an issues on Filebeat that make the start sequence of filebeat non-deterministic. It was possible that not all the hooks were configured correctly before the managed was receiving a configuration from the Elastic Agent. This causes an inconsistency between the expected configuration state and the actual running states, this includes the following symptoms: - Having Filebeat runnings and not sending any data to Elasticsearch - Having Filebeat partially configured, when only some inputs were sending data. - Missing log from the Filebeat collector - Having only metricsbeats running and sending logs. This solves the issues by moving the `Start` and stop `Stop` of the managed into the beats initialization process, each beats need to be adjusted to support. This is indeed a breaking changes for beats author, but the bootstrap process of beats and libbeat cannot easily be extended to make the change into a unique place. (cherry picked from commit 4c14f03ff73c5dd58bcb617e42195a8ff76848a8) Co-authored-by: Pier-Hugues Pellerin --- CHANGELOG.next.asciidoc | 1 + filebeat/beater/filebeat.go | 8 +++++ heartbeat/beater/heartbeat.go | 6 ++++ libbeat/cmd/instance/beat.go | 5 ++- libbeat/management/management.go | 40 +++++++++++++++++------- metricbeat/beater/metricbeat.go | 8 +++++ packetbeat/beater/packetbeat.go | 13 ++++++-- x-pack/libbeat/management/manager.go | 30 +++++++++++++----- x-pack/osquerybeat/beater/osquerybeat.go | 7 +++++ 9 files changed, 94 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b83abc23575..69b30a19928 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -39,6 +39,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif - Fixes Beats crashing when glibc >= 2.35 is used {issue}30576[30576] - Log errors when parsing and applying config blocks and if the input is disabled. {pull}30534[30534] - Wildcard fields no longer have a default ignore_above setting of 1024. {issue}30096[30096] {pull}30668[30668] +- Ensure that the Reloadable part of beats are initialized before the Manager is started. {issue}30533[30533] - Ignore bugfix version when running version compatibility check against Elasticsearch. {pull}30746[30746] *Auditbeat* diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index c266faf10ae..d38cd65366a 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -381,6 +381,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error { } adiscover.Start() + // We start the manager when all the subsystem are initialized and ready to received events. + if err := b.Manager.Start(); err != nil { + return err + } + // Add done channel to wait for shutdown signal waitFinished.AddChan(fb.done) waitFinished.Wait() @@ -411,6 +416,9 @@ func (fb *Filebeat) Run(b *beat.Beat) error { } } + // Stop the manager and stop the connection to any dependent services. + b.Manager.Stop() + return nil } diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index 003f4da5934..da2c5c55bbb 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -113,6 +113,12 @@ func (bt *Heartbeat) Run(b *beat.Beat) error { return err } } + // Configure the beats Manager to start after all the reloadable hooks are initialized + // and shutdown when the function return. + if err := b.Manager.Start(); err != nil { + return err + } + defer b.Manager.Stop() if bt.config.Autodiscover != nil { bt.autodiscover, err = bt.makeAutodiscover(b) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index b18192d0fd4..ddbba363b98 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -488,9 +488,8 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { logp.Info("%s start running.", b.Info.Beat) - // Launch config manager - b.Manager.Start(beater.Stop) - defer b.Manager.Stop() + // Allow the manager to stop a currently running beats out of bound. + b.Manager.SetStopCallback(beater.Stop) return beater.Run(&b.Beat) } diff --git a/libbeat/management/management.go b/libbeat/management/management.go index b29dd74a3c9..b16ee770c4a 100644 --- a/libbeat/management/management.go +++ b/libbeat/management/management.go @@ -74,22 +74,38 @@ type Manager interface { // Enabled returns true if manager is enabled. Enabled() bool - // Start the config manager giving it a stopFunc callback - // so the beat can be told when to stop. - Start(stopFunc func()) - - // Stop the config manager. + // Start needs to invoked when the system is ready to receive an external configuration and + // also ready to start ingesting new events. The manager expects that all the reloadable and + // reloadable list are fixed for the whole lifetime of the manager. + // + // Notes: Adding dynamically new reloadable hooks at runtime can lead to inconsistency in the + // execution. + Start() error + + // Stop when this method is called, the manager will stop receiving new actions, no more action + // will be propagated to the handlers and will not try to configure any reloadable parts. + // When the manager is stop the callback will be called to signal that the system can terminate. + // + // Calls to 'CheckRawConfig()' or 'SetPayload()' will be ignored after calling stop. + // + // Note: Stop will not call 'UnregisterAction()' automaticallty. Stop() + // SetStopCallback accepts a function that need to be called when the manager want to shutdown the + // beats. This is needed when you want your beats to be gracefully shutdown remotely by the Elastic Agent + // when a policy doesn't need to run this beat. + SetStopCallback(f func()) + // CheckRawConfig check settings are correct before launching the beat. CheckRawConfig(cfg *common.Config) error // RegisterAction registers action handler with the client RegisterAction(action client.Action) + // UnregisterAction unregisters action handler with the client UnregisterAction(action client.Action) - // SetPayload sets the client payload + // SetPayload Allows to add additional metadata to future requests made by the manager. SetPayload(map[string]interface{}) } @@ -136,10 +152,11 @@ func defaultModeConfig() *modeConfig { // nilManager, fallback when no manager is present type nilManager struct { - logger *logp.Logger - lock sync.Mutex - status Status - msg string + logger *logp.Logger + lock sync.Mutex + status Status + msg string + stopFunc func() } func nilFactory(*common.Config, *reload.Registry, uuid.UUID) (Manager, error) { @@ -151,8 +168,9 @@ func nilFactory(*common.Config, *reload.Registry, uuid.UUID) (Manager, error) { }, nil } +func (*nilManager) SetStopCallback(func()) {} func (*nilManager) Enabled() bool { return false } -func (*nilManager) Start(_ func()) {} +func (*nilManager) Start() error { return nil } func (*nilManager) Stop() {} func (*nilManager) CheckRawConfig(cfg *common.Config) error { return nil } func (n *nilManager) UpdateStatus(status Status, msg string) { diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index 75e6f16652e..9b24c290ba5 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -227,6 +227,13 @@ func (bt *Metricbeat) Run(b *beat.Beat) error { modules.Stop() }() + // Start the manager after all the reload hooks are configured, + // the Manager is stopped at the end of the execution. + if err := b.Manager.Start(); err != nil { + return err + } + defer b.Manager.Stop() + // Dynamic file based modules (metricbeat.config.modules) if bt.config.ConfigModules.Enabled() { moduleReloader := cfgfile.NewReloader(b.Publisher, bt.config.ConfigModules) @@ -256,6 +263,7 @@ func (bt *Metricbeat) Run(b *beat.Beat) error { } wg.Wait() + return nil } diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index bac5acf6f5b..d5d4cffdac6 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -147,10 +147,19 @@ func (pb *packetbeat) runStatic(b *beat.Beat, factory *processorFactory) error { func (pb *packetbeat) runManaged(b *beat.Beat, factory *processorFactory) error { runner := newReloader(management.DebugK, factory, b.Publisher) reload.Register.MustRegisterList("inputs", runner) - defer runner.Stop() - logp.Debug("main", "Waiting for the runner to finish") + // Start the manager after all the hooks are registered and terminates when + // the function return. + if err := b.Manager.Start(); err != nil { + return err + } + + defer func() { + runner.Stop() + b.Manager.Stop() + }() + for { select { case <-pb.done: diff --git a/x-pack/libbeat/management/manager.go b/x-pack/libbeat/management/manager.go index e52ebc4eeb7..2cb3cd4b475 100644 --- a/x-pack/libbeat/management/manager.go +++ b/x-pack/libbeat/management/manager.go @@ -96,27 +96,36 @@ func (cm *Manager) Enabled() bool { return cm.config.Enabled } -// Start the config manager -func (cm *Manager) Start(stopFunc func()) { - if !cm.Enabled() { - return - } +// SetStopCallback sets the callback to run when the manager want to shutdown the beats gracefully. +func (cm *Manager) SetStopCallback(stopFunc func()) { + cm.lock.Lock() + defer cm.lock.Unlock() + cm.stopFunc = stopFunc +} +// Start the config manager. +func (cm *Manager) Start() error { cm.lock.Lock() defer cm.lock.Unlock() + if !cm.Enabled() { + return nil + } + cfgwarn.Beta("Fleet management is enabled") cm.logger.Info("Starting fleet management service") - cm.stopFunc = stopFunc cm.isRunning = true err := cm.client.Start(context.Background()) if err != nil { cm.logger.Errorf("failed to start elastic-agent-client: %s", err) + return err } + cm.logger.Info("Ready to receive configuration") + return nil } -// Stop the config manager +// Stop stops the current Manager and close the connection to Elastic Agent. func (cm *Manager) Stop() { cm.lock.Lock() defer cm.lock.Unlock() @@ -133,6 +142,8 @@ func (cm *Manager) Stop() { // CheckRawConfig check settings are correct to start the beat. This method // checks there are no collision between the existing configuration and what // fleet management can configure. +// +// NOTE: This is currently not implemented for fleet. func (cm *Manager) CheckRawConfig(cfg *common.Config) error { // TODO implement this method return nil @@ -217,6 +228,9 @@ func (cm *Manager) SetPayload(payload map[string]interface{}) { } func (cm *Manager) OnStop() { + cm.lock.Lock() + defer cm.lock.Unlock() + if cm.stopFunc != nil { cm.client.Status(proto.StateObserved_STOPPING, "Stopping", nil) cm.stopFunc() @@ -320,7 +334,7 @@ func (cm *Manager) toConfigBlocks(cfg common.MapStr) (ConfigBlocks, error) { for _, regName := range cm.registry.GetRegisteredNames() { iBlock, err := cfg.GetValue(regName) if err != nil { - cm.logger.Errorf("failed to get '%s' from config: %v. Continuing to next one", regName, err) + cm.logger.Warnf("failed to get '%s' from config: %v. Continuing to next one", regName, err) continue } diff --git a/x-pack/osquerybeat/beater/osquerybeat.go b/x-pack/osquerybeat/beater/osquerybeat.go index 2a658aefffd..7832e184955 100644 --- a/x-pack/osquerybeat/beater/osquerybeat.go +++ b/x-pack/osquerybeat/beater/osquerybeat.go @@ -169,6 +169,13 @@ func (bt *osquerybeat) Run(b *beat.Beat) error { runner.Update(ctx, bt.config.Inputs) } + // Ensure that all the hooks and actions are ready before starting the Manager + // to receive configuration. + if err := b.Manager.Start(); err != nil { + return err + } + defer b.Manager.Stop() + // Set the osquery beat version to the manager payload. This allows the bundled osquery version to be reported to the stack. bt.setManagerPayload(b)