Skip to content

Commit

Permalink
Add contexts when Beats create connections to ES
Browse files Browse the repository at this point in the history
  • Loading branch information
belimawr committed Sep 19, 2024
1 parent 20b1e3c commit bfa6d6c
Show file tree
Hide file tree
Showing 13 changed files with 70 additions and 34 deletions.
37 changes: 24 additions & 13 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package beater

import (
"context"
"flag"
"fmt"
"path/filepath"
Expand Down Expand Up @@ -195,14 +196,16 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {

overwritePipelines := true
b.OverwritePipelinesCallback = func(esConfig *conf.C) error {
esClient, err := eslegclient.NewConnectedClient(esConfig, "Filebeat")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
esClient, err := eslegclient.NewConnectedClient(ctx, esConfig, "Filebeat")
if err != nil {
return err
}

// When running the subcommand setup, configuration from modules.d directories
// have to be loaded using cfg.Reloader. Otherwise those configurations are skipped.
pipelineLoaderFactory := newPipelineLoaderFactory(b.Config.Output.Config())
pipelineLoaderFactory := newPipelineLoaderFactory(ctx, b.Config.Output.Config())
enableAllFilesets, _ := b.BeatConfig.Bool("config.modules.enable_all_filesets", -1)
forceEnableModuleFilesets, _ := b.BeatConfig.Bool("config.modules.force_enable_module_filesets", -1)
filesetOverrides := fileset.FilesetOverrides{
Expand Down Expand Up @@ -322,14 +325,6 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
outDone := make(chan struct{}) // outDone closes down all active pipeline connections
pipelineConnector := channel.NewOutletFactory(outDone).Create

// Create a ES connection factory for dynamic modules pipeline loading
var pipelineLoaderFactory fileset.PipelineLoaderFactory
if b.Config.Output.Name() == "elasticsearch" {
pipelineLoaderFactory = newPipelineLoaderFactory(b.Config.Output.Config())
} else {
logp.Warn(pipelinesWarning)
}

inputsLogger := logp.NewLogger("input")
v2Inputs := fb.pluginFactory(b.Info, inputsLogger, stateStore)
v2InputLoader, err := v2.NewLoader(inputsLogger, v2Inputs, "type", cfg.DefaultType)
Expand All @@ -350,8 +345,22 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
compat.RunnerFactory(inputsLogger, b.Info, v2InputLoader),
input.NewRunnerFactory(pipelineConnector, registrar, fb.done),
))
moduleLoader := fileset.NewFactory(inputLoader, b.Info, pipelineLoaderFactory, config.OverwritePipelines)

// Create a ES connection factory for dynamic modules pipeline loading
var pipelineLoaderFactory fileset.PipelineLoaderFactory
// The pipelineFactory needs a context to control the connections to ES,
// when the pipelineFactory/ESClient are not needed any more the context
// must be cancelled. This pipeline factory will be used by the moduleLoader
// that is run by a crawler, whenever this crawler is stopped we also cancel
// the context.
pipelineFactoryCtx, cancelPipelineFactoryCtx := context.WithCancel(context.Background())
defer cancelPipelineFactoryCtx()
if b.Config.Output.Name() == "elasticsearch" {
pipelineLoaderFactory = newPipelineLoaderFactory(pipelineFactoryCtx, b.Config.Output.Config())
} else {
logp.Warn(pipelinesWarning)
}
moduleLoader := fileset.NewFactory(inputLoader, b.Info, pipelineLoaderFactory, config.OverwritePipelines)
crawler, err := newCrawler(inputLoader, moduleLoader, config.Inputs, fb.done, *once)
if err != nil {
logp.Err("Could not init crawler: %v", err)
Expand Down Expand Up @@ -389,6 +398,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
err = crawler.Start(fb.pipeline, config.ConfigInput, config.ConfigModules)
if err != nil {
crawler.Stop()
cancelPipelineFactoryCtx()
return fmt.Errorf("Failed to start crawler: %w", err)
}

Expand Down Expand Up @@ -444,6 +454,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
modules.Stop()
adiscover.Stop()
crawler.Stop()
cancelPipelineFactoryCtx()

timeout := fb.config.ShutdownTimeout
// Checks if on shutdown it should wait for all events to be published
Expand Down Expand Up @@ -487,9 +498,9 @@ func (fb *Filebeat) Stop() {
}

// Create a new pipeline loader (es client) factory
func newPipelineLoaderFactory(esConfig *conf.C) fileset.PipelineLoaderFactory {
func newPipelineLoaderFactory(ctx context.Context, esConfig *conf.C) fileset.PipelineLoaderFactory {
pipelineLoaderFactory := func() (fileset.PipelineLoader, error) {
esClient, err := eslegclient.NewConnectedClient(esConfig, "Filebeat")
esClient, err := eslegclient.NewConnectedClient(ctx, esConfig, "Filebeat")
if err != nil {
return nil, fmt.Errorf("Error creating Elasticsearch client: %w", err)
}
Expand Down
15 changes: 11 additions & 4 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) {
if b.Config.Output.Name() == "elasticsearch" && !b.Manager.Enabled() {
// Connect to ES and setup the State loader if the output is not managed by agent
// Note this, intentionally, blocks until connected or max attempts reached
esClient, err := makeESClient(b.Config.Output.Config(), 3, 2*time.Second)
// TODO(Tiago): I believe this cannot be cancelled here, but all tests are passing
// so I need to fund out the correct life cycle for this connection/context
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
esClient, err := makeESClient(ctx, b.Config.Output.Config(), 3, 2*time.Second)
if err != nil {
if parsedConfig.RunOnce {
trace.Abort()
Expand Down Expand Up @@ -275,7 +279,10 @@ func (bt *Heartbeat) RunCentralMgmtMonitors(b *beat.Beat) {
}

// Backoff panics with 0 duration, set to smallest unit
esClient, err := makeESClient(outCfg.Config(), 1, 1*time.Nanosecond)
// TODO(Tiago): find out the correct lifecycle for this context/connection
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
esClient, err := makeESClient(ctx, outCfg.Config(), 1, 1*time.Nanosecond)
if err != nil {
logp.L().Warnf("skipping monitor state management during managed reload: %w", err)
} else {
Expand Down Expand Up @@ -324,7 +331,7 @@ func (bt *Heartbeat) Stop() {
}

// makeESClient establishes an ES connection meant to load monitors' state
func makeESClient(cfg *conf.C, attempts int, wait time.Duration) (*eslegclient.Connection, error) {
func makeESClient(ctx context.Context, cfg *conf.C, attempts int, wait time.Duration) (*eslegclient.Connection, error) {
var (
esClient *eslegclient.Connection
err error
Expand Down Expand Up @@ -353,7 +360,7 @@ func makeESClient(cfg *conf.C, attempts int, wait time.Duration) (*eslegclient.C
}

for i := 0; i < attempts; i++ {
esClient, err = eslegclient.NewConnectedClient(newCfg, "Heartbeat")
esClient, err = eslegclient.NewConnectedClient(ctx, newCfg, "Heartbeat")
if err == nil {
connectDelay.Reset()
return esClient, nil
Expand Down
3 changes: 2 additions & 1 deletion heartbeat/beater/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package beater

import (
"context"
"testing"
"time"

Expand All @@ -39,7 +40,7 @@ func TestMakeESClient(t *testing.T) {
anyAttempt := 1
anyDuration := 1 * time.Second

_, _ = makeESClient(origCfg, anyAttempt, anyDuration)
_, _ = makeESClient(context.Background(), origCfg, anyAttempt, anyDuration)

timeout, err := origCfg.Int("timeout", -1)
require.NoError(t, err)
Expand Down
4 changes: 3 additions & 1 deletion libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,9 @@ func (b *Beat) Setup(settings Settings, bt beat.Creator, setup SetupSettings) er
if !isElasticsearchOutput(outCfg.Name()) {
return fmt.Errorf("index management requested but the Elasticsearch output is not configured/enabled")
}
esClient, err := eslegclient.NewConnectedClient(outCfg.Config(), b.Info.Beat)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
esClient, err := eslegclient.NewConnectedClient(ctx, outCfg.Config(), b.Info.Beat)
if err != nil {
return err
}
Expand Down
5 changes: 4 additions & 1 deletion libbeat/esleg/eslegclient/api_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package eslegclient

import (
"context"
"encoding/json"
"fmt"
"net/http"
Expand Down Expand Up @@ -90,7 +91,9 @@ func TestOneHost500Resp(t *testing.T) {
server := ElasticsearchMock(http.StatusInternalServerError, []byte("Something wrong happened"))

client := newTestConnection(t, server.URL)
err := client.Connect()
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
err := client.Connect(ctx)
if err != nil {
t.Fatalf("Failed to connect: %v", err)
}
Expand Down
5 changes: 4 additions & 1 deletion libbeat/esleg/eslegclient/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package eslegclient

import (
"context"
"encoding/json"
"testing"

Expand Down Expand Up @@ -178,7 +179,9 @@ func newTestConnection(t *testing.T, url string) *Connection {
URL: url,
})
conn.Encoder = NewJSONEncoder(nil, false)
if err := conn.Connect(); err != nil {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
if err := conn.Connect(ctx); err != nil {
t.Fatalf("cannot connect to Elasticsearch: %s", err)
}

Expand Down
8 changes: 1 addition & 7 deletions libbeat/esleg/eslegclient/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func NewClients(cfg *cfg.C, beatname string) ([]Connection, error) {
}

// NewConnectedClient returns a non-thread-safe connection. Make sure for each goroutine you initialize a new connection.
func NewConnectedClient(cfg *cfg.C, beatname string) (*Connection, error) {
func NewConnectedClient(ctx context.Context, cfg *cfg.C, beatname string) (*Connection, error) {
clients, err := NewClients(cfg, beatname)
if err != nil {
return nil, err
Expand All @@ -260,20 +260,14 @@ func NewConnectedClient(cfg *cfg.C, beatname string) (*Connection, error) {
errors := []string{}

for _, client := range clients {
// client.Connect makes a call to get the ES version, so we create
// a context for those calls. Users of this Connection will have
// to call Connect again.
ctx, cancel := context.WithCancel(context.Background())
err = client.Connect(ctx)
if err != nil {
const errMsg = "error connecting to Elasticsearch at %v: %v"
client.log.Errorf(errMsg, client.URL, err)
err = fmt.Errorf(errMsg, client.URL, err)
errors = append(errors, err.Error())
cancel()
continue
}
cancel()
return &client, nil
}
return nil, fmt.Errorf("couldn't connect to any of the configured Elasticsearch hosts. Errors: %v", errors)
Expand Down
4 changes: 3 additions & 1 deletion libbeat/esleg/eslegclient/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ func TestUserAgentHeader(t *testing.T) {
testCase.connSettings.URL = server.URL
conn, err := NewConnection(testCase.connSettings)
require.NoError(t, err)
require.NoError(t, conn.Connect(), "conn.Connect must not return an error")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
require.NoError(t, conn.Connect(ctx), "conn.Connect must not return an error")
})
}
}
Expand Down
5 changes: 4 additions & 1 deletion packetbeat/beater/packetbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package beater

import (
"context"
"flag"
"fmt"
"sync"
Expand Down Expand Up @@ -111,7 +112,9 @@ func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) {
}
overwritePipelines = config.OverwritePipelines
b.OverwritePipelinesCallback = func(esConfig *conf.C) error {
esClient, err := eslegclient.NewConnectedClient(esConfig, "Packetbeat")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
esClient, err := eslegclient.NewConnectedClient(ctx, esConfig, "Packetbeat")
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion winlogbeat/beater/winlogbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ func (eb *Winlogbeat) init(b *beat.Beat) error {
}
b.OverwritePipelinesCallback = func(esConfig *conf.C) error {
overwritePipelines := config.OverwritePipelines
esClient, err := eslegclient.NewConnectedClient(esConfig, "Winlogbeat")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
esClient, err := eslegclient.NewConnectedClient(ctx, esConfig, "Winlogbeat")
if err != nil {
return err
}
Expand Down
5 changes: 4 additions & 1 deletion x-pack/winlogbeat/module/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package module

import (
"context"
"encoding/json"
"flag"
"fmt"
Expand Down Expand Up @@ -105,7 +106,9 @@ func testIngestPipeline(t *testing.T, pipeline, pattern string, p *params) {
}
defer conn.Close()

err = conn.Connect()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err = conn.Connect(ctx)
if err != nil {
t.Fatalf("unexpected error making connection: %v", err)
}
Expand Down
4 changes: 3 additions & 1 deletion x-pack/winlogbeat/module/wintest/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ func TestDocker(t *testing.T) {
}
defer conn.Close()

err = conn.Connect()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err = conn.Connect(ctx)
if err != nil {
t.Fatalf("unexpected error making connection: %v", err)
}
Expand Down
5 changes: 4 additions & 1 deletion x-pack/winlogbeat/module/wintest/simulate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package wintest_test

import (
"context"
"encoding/json"
"fmt"
"os"
Expand Down Expand Up @@ -72,7 +73,9 @@ func TestSimulate(t *testing.T) {
}
defer conn.Close()

err = conn.Connect()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err = conn.Connect(ctx)
if err != nil {
t.Fatalf("unexpected error making connection: %v", err)
}
Expand Down

0 comments on commit bfa6d6c

Please sign in to comment.