Skip to content

Commit

Permalink
[libbeat/management]: support filebeat inputs to report their status …
Browse files Browse the repository at this point in the history
…to elastic-agent (#39209)

* feat(management): implement support for reporting input health including multiple streams

* fix: add missing file licenses

* feat(tests): update also units revision number

* feat: make stream statuses explicit in reported payload

* feat: define new env vars to allow overriding scheme and ssl verification in integration framework

* feat: utilise the es stack used through integration framework in CEL integration test

* fix: remove logging from integration test to reduce verbosity

* chore: move UpdateStatus closer to error logging

* Revert "feat: define new env vars to allow overriding scheme and ssl verification in integration framework"

This reverts commit 214cdd1.

* fix: change configStateIdx and a field in the source of input units to match the behavior of elastic-agent

* fix: invoke stringer directly with go run

* fix: add in godoc that StatusReporter for standalone Beat execution is expected to be nil

* fix(linter): preallocate slice for list of names registered

* doc: update CHANGELOG.next.asciidoc

* fix(linter): disable typecheck for rest in walkMap

* fix: return client.UnitStateFailed for unknown statuses

* chore: rename application to unit in statuses godoc to capture better their scope

* fix: do not rely on event["error"] to mark the status of cel input as running

* fix: remove usage of testify assert and require packages

* fix: move integration_test.go to parent directory and add comments to improve the code readability

* fix: improve readability by code minor restructuring and comments revisiting

* fix: remove stubborn t.Logf

* fix: simplify code in calcState

* fix: log when a unit is not found during a unit change event

* feat: add unit ID in the unit-state named logger

* feat: add unit-tests for unit.go

* feat: introduce UpdateStatus at v2.Context

* fix: add missing license header

* feat: report at the unit level the error msg of the first FAILED or DEGRADED stream
  • Loading branch information
pkoutsovasilis committed May 22, 2024
1 parent a6aa347 commit 38a678b
Show file tree
Hide file tree
Showing 13 changed files with 1,370 additions and 127 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Ensure all responses sent by HTTP Endpoint are HTML-escaped. {pull}39329[39329]
- Update CEL mito extensions to v1.11.0 to improve type checking. {pull}39460[39460]
- Improve logging of request and response with request trace logging in error conditions. {pull}39455[39455]
- Implement Elastic Agent status and health reporting for CEL Filebeat input. {pull}39209[39209]
- Add HTTP metrics to CEL input. {issue}39501[39501] {pull}39503[39503]
- Add default user-agent to CEL HTTP requests. {issue}39502[39502] {pull}39587[39587]
- Improve reindexing support in security module pipelines. {issue}38224[38224] {pull}[]
Expand Down
30 changes: 19 additions & 11 deletions filebeat/input/v2/compat/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/management/status"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/go-concert/ctxtool"
Expand All @@ -50,13 +51,14 @@ type factory struct {
// On stop the runner triggers the shutdown signal and waits until the input
// has returned.
type runner struct {
id string
log *logp.Logger
agent *beat.Info
wg sync.WaitGroup
sig ctxtool.CancelContext
input v2.Input
connector beat.PipelineConnector
id string
log *logp.Logger
agent *beat.Info
wg sync.WaitGroup
sig ctxtool.CancelContext
input v2.Input
connector beat.PipelineConnector
statusReporter status.StatusReporter
}

// RunnerFactory creates a cfgfile.RunnerFactory from an input Loader that is
Expand Down Expand Up @@ -109,6 +111,10 @@ func (f *factory) Create(
}, nil
}

func (r *runner) SetStatusReporter(reported status.StatusReporter) {
r.statusReporter = reported
}

func (r *runner) String() string { return r.input.Name() }

func (r *runner) Start() {
Expand All @@ -121,10 +127,11 @@ func (r *runner) Start() {
log.Infof("Input '%s' starting", name)
err := r.input.Run(
v2.Context{
ID: r.id,
Agent: *r.agent,
Logger: log,
Cancelation: r.sig,
ID: r.id,
Agent: *r.agent,
Logger: log,
Cancelation: r.sig,
StatusReporter: r.statusReporter,
},
r.connector,
)
Expand All @@ -140,6 +147,7 @@ func (r *runner) Stop() {
r.sig.Cancel()
r.wg.Wait()
r.log.Infof("Input '%s' stopped (runner)", r.input.Name())
r.statusReporter = nil
}

func configID(config *conf.C) (string, error) {
Expand Down
12 changes: 12 additions & 0 deletions filebeat/input/v2/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/management/status"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"

Expand Down Expand Up @@ -83,6 +84,17 @@ type Context struct {

// Cancelation is used by Beats to signal the input to shutdown.
Cancelation Canceler

// StatusReporter provides a method to update the status of the underlying unit
// that maps to the config. Note: Under standalone execution of Filebeat this is
// expected to be nil.
StatusReporter status.StatusReporter
}

func (c Context) UpdateStatus(status status.Status, msg string) {
if c.StatusReporter != nil {
c.StatusReporter.UpdateStatus(status, msg)
}
}

// TestContext provides the Input Test function with common environmental
Expand Down
7 changes: 7 additions & 0 deletions libbeat/cfgfile/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/diagnostics"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -153,6 +154,12 @@ func (r *RunnerList) Reload(configs []*reload.ConfigWithMeta) error {

r.logger.Debugf("Starting runner: %s", runner)
r.runners[hash] = runner
if config.StatusReporter != nil {
if runnerWithStatus, ok := runner.(status.WithStatusReporter); ok {
runnerWithStatus.SetStatusReporter(config.StatusReporter)
}
}

runner.Start()
moduleStarts.Add(1)
if config.DiagCallback != nil {
Expand Down
8 changes: 7 additions & 1 deletion libbeat/common/reload/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"sync"

"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand All @@ -47,6 +48,11 @@ type ConfigWithMeta struct {

// InputUnitID is the unit's ID that generated this ConfigWithMeta
InputUnitID string

// StatusReporter provides a method to update the status of the underlying unit
// that maps to the config. Note: Under standalone execution of a Beat this is
// expected to be nil.
StatusReporter status.StatusReporter
}

// ReloadableList provides a method to reload the configuration of a list of entities
Expand Down Expand Up @@ -160,7 +166,7 @@ func (r *Registry) GetReloadableOutput() Reloadable {
func (r *Registry) GetRegisteredNames() []string {
r.RLock()
defer r.RUnlock()
var names []string
names := make([]string, 0, len(r.confs)+len(r.confsLists))

for name := range r.confs {
names = append(names, name)
Expand Down
40 changes: 5 additions & 35 deletions libbeat/management/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,49 +21,19 @@ import (
"sync"

"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)

// Status describes the current status of the beat.
type Status int

//go:generate stringer -type=Status
const (
// Unknown is initial status when none has been reported.
Unknown Status = iota
// Starting is status describing application is starting.
Starting
// Configuring is status describing application is configuring.
Configuring
// Running is status describing application is running.
Running
// Degraded is status describing application is degraded.
Degraded
// Failed is status describing application is failed. This status should
// only be used in the case the beat should stop running as the failure
// cannot be recovered.
Failed
// Stopping is status describing application is stopping.
Stopping
// Stopped is status describing application is stopped.
Stopped
)

// DebugK used as key for all things central management
var DebugK = "centralmgmt"

// StatusReporter provides a method to update current status of the beat.
type StatusReporter interface {
// UpdateStatus called when the status of the beat has changed.
UpdateStatus(status Status, msg string)
}

// Manager interacts with the beat to provide status updates and to receive
// configurations.
type Manager interface {
StatusReporter
status.StatusReporter

// Enabled returns true if manager is enabled.
Enabled() bool
Expand Down Expand Up @@ -133,7 +103,7 @@ func NewManager(cfg *config.C, registry *reload.Registry) (Manager, error) {
}
return &fallbackManager{
logger: logp.NewLogger("mgmt"),
status: Unknown,
status: status.Unknown,
msg: "",
}, nil
}
Expand All @@ -152,13 +122,13 @@ func SetManagerFactory(factory ManagerFactory) {
type fallbackManager struct {
logger *logp.Logger
lock sync.Mutex
status Status
status status.Status
msg string
stopFunc func()
stopOnce sync.Once
}

func (n *fallbackManager) UpdateStatus(status Status, msg string) {
func (n *fallbackManager) UpdateStatus(status status.Status, msg string) {
n.lock.Lock()
defer n.lock.Unlock()
if n.status != status || n.msg != msg {
Expand Down
55 changes: 55 additions & 0 deletions libbeat/management/status/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package status

// Status describes the current status of the beat.
type Status int

//go:generate go run golang.org/x/tools/cmd/stringer -type=Status
const (
// Unknown is initial status when none has been reported.
Unknown Status = iota
// Starting is status describing unit is starting.
Starting
// Configuring is status describing unit is configuring.
Configuring
// Running is status describing unit is running.
Running
// Degraded is status describing unit is degraded.
Degraded
// Failed is status describing unit is failed. This status should
// only be used in the case the beat should stop running as the failure
// cannot be recovered.
Failed
// Stopping is status describing unit is stopping.
Stopping
// Stopped is status describing unit is stopped.
Stopped
)

// StatusReporter provides a method to update current status of a unit.
type StatusReporter interface {
// UpdateStatus updates the status of the unit.
UpdateStatus(status Status, msg string)
}

// WithStatusReporter provides a method to set a status reporter
type WithStatusReporter interface {
// SetStatusReporter sets the status reporter
SetStatusReporter(reporter StatusReporter)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 38a678b

Please sign in to comment.