Skip to content

Commit c51fee3

Browse files
authored
x-pack/filebeat/input/lumberjack: add fleet status update functionality (#44339)
1 parent a8eafbf commit c51fee3

File tree

4 files changed

+27
-7
lines changed

4 files changed

+27
-7
lines changed

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,7 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403]
431431
- Add milliseconds to document timestamp from awscloudwatch Filebeat input {pull}44306[44306]
432432
- Add support to the Active Directory entity analytics provider for device entities. {pull}44309[44309]
433433
- Add support for OPTIONS request to HTTP Endpoint input. {issue}43930[43930] {pull}44387[44387]
434-
434+
- Add Fleet status update functionality to lumberjack input. {issue}44283[44283] {pull}44339[44339]
435435

436436
*Auditbeat*
437437

x-pack/filebeat/input/lumberjack/input.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
inputv2 "github.com/elastic/beats/v7/filebeat/input/v2"
1111
"github.com/elastic/beats/v7/libbeat/beat"
1212
"github.com/elastic/beats/v7/libbeat/feature"
13+
"github.com/elastic/beats/v7/libbeat/management/status"
1314
conf "github.com/elastic/elastic-agent-libs/config"
1415
)
1516

@@ -49,23 +50,27 @@ func newLumberjackInput(lumberjackConfig config) (*lumberjackInput, error) {
4950
func (i *lumberjackInput) Name() string { return inputName }
5051

5152
func (i *lumberjackInput) Test(inputCtx inputv2.TestContext) error {
52-
s, err := newServer(i.config, inputCtx.Logger, nil, nil)
53+
s, err := newServer(i.config, inputCtx.Logger, nil, nil, nil)
5354
if err != nil {
5455
return err
5556
}
5657
return s.Close()
5758
}
5859

5960
func (i *lumberjackInput) Run(inputCtx inputv2.Context, pipeline beat.Pipeline) error {
61+
inputCtx.UpdateStatus(status.Starting, "")
6062
inputCtx.Logger.Info("Starting " + inputName + " input")
6163
defer inputCtx.Logger.Info(inputName + " input stopped")
6264

65+
inputCtx.UpdateStatus(status.Configuring, "")
6366
// Create client for publishing events and receive notification of their ACKs.
6467
client, err := pipeline.ConnectWith(beat.ClientConfig{
6568
EventListener: newEventACKHandler(),
6669
})
6770
if err != nil {
68-
return fmt.Errorf("failed to create pipeline client: %w", err)
71+
err := fmt.Errorf("failed to create pipeline client: %w", err)
72+
inputCtx.UpdateStatus(status.Failed, err.Error())
73+
return err
6974
}
7075
defer client.Close()
7176

@@ -74,7 +79,7 @@ func (i *lumberjackInput) Run(inputCtx inputv2.Context, pipeline beat.Pipeline)
7479
metrics := newInputMetrics(inputCtx.ID, nil)
7580
defer metrics.Close()
7681

77-
s, err := newServer(i.config, inputCtx.Logger, client.Publish, metrics)
82+
s, err := newServer(i.config, inputCtx.Logger, client.Publish, inputCtx.StatusReporter, metrics)
7883
if err != nil {
7984
return err
8085
}
@@ -83,6 +88,7 @@ func (i *lumberjackInput) Run(inputCtx inputv2.Context, pipeline beat.Pipeline)
8388
// Shutdown the server when cancellation is signaled.
8489
go func() {
8590
<-inputCtx.Cancelation.Done()
91+
inputCtx.UpdateStatus(status.Stopping, "")
8692
s.Close()
8793
}()
8894

x-pack/filebeat/input/lumberjack/server.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"golang.org/x/net/netutil"
1515

1616
"github.com/elastic/beats/v7/libbeat/beat"
17+
"github.com/elastic/beats/v7/libbeat/management/status"
1718
"github.com/elastic/elastic-agent-libs/logp"
1819
"github.com/elastic/elastic-agent-libs/monitoring"
1920
"github.com/elastic/elastic-agent-libs/transport/tlscommon"
@@ -23,6 +24,7 @@ import (
2324

2425
type server struct {
2526
config config
27+
status status.StatusReporter
2628
log *logp.Logger
2729
publish func(beat.Event)
2830
metrics *inputMetrics
@@ -31,9 +33,13 @@ type server struct {
3133
bindAddress string
3234
}
3335

34-
func newServer(c config, log *logp.Logger, pub func(beat.Event), metrics *inputMetrics) (*server, error) {
36+
func newServer(c config, log *logp.Logger, pub func(beat.Event), stat status.StatusReporter, metrics *inputMetrics) (*server, error) {
37+
if stat == nil {
38+
stat = noopReporter{}
39+
}
3540
ljSvr, bindAddress, err := newLumberjack(c)
3641
if err != nil {
42+
stat.UpdateStatus(status.Failed, "failed to start lumberjack server: "+err.Error())
3743
return nil, err
3844
}
3945

@@ -50,6 +56,7 @@ func newServer(c config, log *logp.Logger, pub func(beat.Event), metrics *inputM
5056

5157
return &server{
5258
config: c,
59+
status: stat,
5360
log: log,
5461
publish: pub,
5562
metrics: metrics,
@@ -58,20 +65,27 @@ func newServer(c config, log *logp.Logger, pub func(beat.Event), metrics *inputM
5865
}, nil
5966
}
6067

68+
type noopReporter struct{}
69+
70+
func (noopReporter) UpdateStatus(status.Status, string) {}
71+
6172
func (s *server) Close() error {
73+
s.status.UpdateStatus(status.Stopping, "")
6274
var err error
6375
s.ljSvrCloseOnce.Do(func() {
6476
err = s.ljSvr.Close()
6577
})
78+
s.status.UpdateStatus(status.Stopped, "")
6679
return err
6780
}
6881

6982
func (s *server) Run() error {
7083
// Process batches until the input is stopped.
84+
s.status.UpdateStatus(status.Running, "")
7185
for batch := range s.ljSvr.ReceiveChan() {
7286
s.processBatch(batch)
7387
}
74-
88+
s.status.UpdateStatus(status.Stopped, "")
7589
return nil
7690
}
7791

x-pack/filebeat/input/lumberjack/server_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func testSendReceive(t testing.TB, c config, numberOfEvents int, clientTLSConfig
7878
collect := newEventCollector(ctx, numberOfEvents)
7979

8080
// Start server.
81-
s, err := newServer(c, log, collect.Publish, nil)
81+
s, err := newServer(c, log, collect.Publish, nil, nil)
8282
require.NoError(t, err)
8383
go func() {
8484
<-ctx.Done()

0 commit comments

Comments
 (0)