Skip to content

Commit

Permalink
Reuse error channel (#2697)
Browse files Browse the repository at this point in the history
* Reuse error channel

* Update changelog/fragments/1686780651-reuse-error-channel.yaml

Co-authored-by: Jaime Soriano Pastor <jaime.soriano@elastic.co>

* Add log message to test trace server

* fix typo

* Intercept expected stop error

* linter fixes

* Update changelog/fragments/1686780651-reuse-error-channel.yaml

* remove commented code

---------

Co-authored-by: Jaime Soriano Pastor <jaime.soriano@elastic.co>
Co-authored-by: Julia Bardi <90178898+juliaElastic@users.noreply.github.com>
  • Loading branch information
3 people committed Jun 20, 2023
1 parent 3e635d2 commit 080d311
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 15 deletions.
32 changes: 32 additions & 0 deletions changelog/fragments/1686780651-reuse-error-channel.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: Fix issue causing Fleet Server Agent to go offline after reboot

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; a word indicating the component this changeset affects.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
#pr: 2697

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: 2431
6 changes: 3 additions & 3 deletions internal/pkg/server/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (a *Agent) Run(ctx context.Context) error {
return
case err := <-a.agent.Errors():
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, io.EOF) {
log.Error().Err(err)
log.Error().Err(err).Msg("Agent wrapper received error.")
}
case change := <-a.agent.UnitChanges():
switch change.Type {
Expand All @@ -145,8 +145,8 @@ func (a *Agent) Run(ctx context.Context) error {
// Agent ID is not set for the component.
t.Stop()
err := a.reconfigure(subCtx)
if err != nil {
log.Error().Err(err)
if err != nil && !errors.Is(err, context.Canceled) {
log.Error().Err(err).Msg("Bootstrap error when reconfiguring")
}
}
}
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/server/agent_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func TestAgent(t *testing.T) {

// reconfigure with agent ID set
agentID := uuid.Must(uuid.NewV4()).String()
t.Logf("Generated new agentID: %s", agentID)
expected = makeExpected(agentID, 1, inputSource, 1, outputSource)
control.Expected(expected)

Expand Down
8 changes: 7 additions & 1 deletion internal/pkg/server/fleet.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ func (f *Fleet) Run(ctx context.Context, initCfg *config.Config) error {
)

started := false
ech := make(chan error, 2)

LOOP:
for {
ech := make(chan error, 2)
if started {
f.reporter.UpdateState(client.UnitStateConfiguring, "Re-configuring", nil) //nolint:errcheck // unclear on what should we do if updating the status fails?
} else {
Expand Down Expand Up @@ -177,6 +177,12 @@ LOOP:
if srvCancel != nil {
log.Info().Msg("stopping server on configuration change")
stop(srvCancel, srvEg)
select {
case err := <-ech:
log.Debug().Err(err).Msg("Server stopped intercepted expected context cancel error.")
case <-time.After(time.Second * 5):
log.Warn().Msg("Server stopped expected context cancel error missing.")
}
}
log.Info().Msg("starting server on configuration change")
srvEg, srvCancel = start(ctx, func(ctx context.Context, cfg *config.Config) error {
Expand Down
157 changes: 146 additions & 11 deletions internal/pkg/server/fleet_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -21,11 +22,13 @@ import (
"testing"
"time"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/go-elasticsearch/v8"
"github.com/gofrs/uuid"
"github.com/google/go-cmp/cmp"
"github.com/hashicorp/go-cleanhttp"
"github.com/rs/zerolog/log"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

Expand All @@ -44,6 +47,7 @@ import (

const (
serverVersion = "8.0.0"
localhost = "localhost"

testWaitServerUp = 3 * time.Second

Expand Down Expand Up @@ -81,10 +85,26 @@ func (s *tserver) baseURL() string {
}

func (s *tserver) waitExit() error {
return s.g.Wait()
err := s.g.Wait()
if errors.Is(err, context.Canceled) {
return nil
}
return err
}

func startTestServer(t *testing.T, ctx context.Context) (*tserver, error) {
type Option func(cfg *config.Config) error

func WithAPM(url string) Option {
return func(cfg *config.Config) error {
cfg.Inputs[0].Server.Instrumentation = config.Instrumentation{
Enabled: true,
Hosts: []string{url},
}
return nil
}
}

func startTestServer(t *testing.T, ctx context.Context, opts ...Option) (*tserver, error) {
t.Helper()

cfg, err := config.LoadFile("../testing/fleet-server-testing.yml")
Expand Down Expand Up @@ -164,11 +184,17 @@ func startTestServer(t *testing.T, ctx context.Context) (*tserver, error) {
srvcfg := &config.Server{}
srvcfg.InitDefaults()
srvcfg.Timeouts.CheckinMaxPoll = 2 * time.Minute // set to a short value for tests
srvcfg.Host = "localhost"
srvcfg.Host = localhost
srvcfg.Port = port
cfg.Inputs[0].Server = *srvcfg
log.Info().Uint16("port", port).Msg("Test fleet server")

for _, opt := range opts {
if err := opt(cfg); err != nil {
return nil, err
}
}

srv, err := NewFleet(build.Info{Version: serverVersion}, state.NewLog(), false)
if err != nil {
return nil, fmt.Errorf("unable to create server: %w", err)
Expand Down Expand Up @@ -226,6 +252,120 @@ func (s *tserver) buildURL(id string, cmd string) string {
return s.baseURL() + ur
}

type MockReporter struct {
mock.Mock
}

func (m *MockReporter) UpdateState(state client.UnitState, message string, payload map[string]interface{}) error {
args := m.Called(state, message, payload)
return args.Error(0)
}

func TestServerConfigErrorReload(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// don't use startTestServer as we need failing initial config.
cfg, err := config.LoadFile("../testing/fleet-server-testing.yml")
require.NoError(t, err)
newCfg, err := config.LoadFile("../testing/fleet-server-testing.yml")
require.NoError(t, err)

logger.Init(cfg, "fleet-server") //nolint:errcheck // test logging setup
bulker := ftesting.SetupBulk(ctx, t)

policyID := uuid.Must(uuid.NewV4()).String()
_, err = dl.CreatePolicy(ctx, bulker, model.Policy{
PolicyID: policyID,
RevisionIdx: 1,
DefaultFleetServer: true,
Data: policyData,
})
require.NoError(t, err)

// In order to create a functional enrollement token we need to use the ES endpoint to create a new api key
// then add the key (id/value) to the enrollment index
esCfg := elasticsearch.Config{
Username: "elastic",
Password: "changeme",
}
es, err := elasticsearch.NewClient(esCfg)
require.NoError(t, err)
key, err := apikey.Create(ctx, es, "default", "", "true", []byte(`{
"fleet-apikey-enroll": {
"cluster": [],
"index": [],
"applications": [{
"application": "fleet",
"privileges": ["no-privileges"],
"resources": ["*"]
}]
}
}`), map[string]interface{}{
"managed_by": "fleet",
"managed": true,
"type": "enroll",
"policy_id": policyID,
})
require.NoError(t, err)

_, err = dl.CreateEnrollmentAPIKey(ctx, bulker, model.EnrollmentAPIKey{
Name: "Default",
APIKey: key.Key,
APIKeyID: key.ID,
PolicyID: policyID,
Active: true,
})
require.NoError(t, err)
// sanity check
tokens, err := dl.FindEnrollmentAPIKeys(ctx, bulker, dl.QueryEnrollmentAPIKeyByPolicyID, dl.FieldPolicyID, policyID)
require.NoError(t, err)
require.NotZero(t, len(tokens), "no enrollment tokens found")

port, err := ftesting.FreePort()
require.NoError(t, err)

srvcfg := &config.Server{}
srvcfg.InitDefaults()
srvcfg.Timeouts.CheckinMaxPoll = 2 * time.Minute // set to a short value for tests
srvcfg.Host = localhost
srvcfg.Port = port
cfg.Inputs[0].Server = *srvcfg
newCfg.Inputs[0].Server = *srvcfg
cfg.HTTP.Enabled = false
newCfg.HTTP.Enabled = false
log.Info().Uint16("port", port).Msg("Test fleet server")

mReporter := &MockReporter{}
srv, err := NewFleet(build.Info{Version: serverVersion}, mReporter, false)
require.NoError(t, err)

mReporter.On("UpdateState", client.UnitStateStarting, mock.Anything, mock.Anything).Return(nil)
mReporter.On("UpdateState", client.UnitStateConfiguring, mock.Anything, mock.Anything).Return(nil)
mReporter.On("UpdateState", client.UnitStateHealthy, mock.Anything, mock.Anything).Run(func(_ mock.Arguments) {
// Call cancel to stop the server once it's healthy
cancel()
}).Return(nil)
mReporter.On("UpdateState", client.UnitStateStopping, mock.Anything, mock.Anything).Return(nil)

// set bad config
cfg.Output.Elasticsearch.ServiceToken = "incorrect"

// send good config
err = srv.Reload(ctx, newCfg)
require.NoError(t, err)

// Run server with the healthy reload
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
return srv.Run(ctx, cfg)
})

err = g.Wait()
require.NoError(t, err)
mReporter.AssertExpectations(t)
}

func TestServerUnauthorized(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -330,6 +470,7 @@ func TestServerInstrumentation(t *testing.T) {
tracerConnected := make(chan struct{}, 1)
tracerDisconnected := make(chan struct{}, 1)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
t.Logf("Tracing server received request to: %s", req.URL.Path)
if req.URL.Path != "/intake/v2/events" {
return
}
Expand All @@ -339,8 +480,8 @@ func TestServerInstrumentation(t *testing.T) {
}))
defer server.Close()

// Start test server
srv, err := startTestServer(t, ctx)
// Start test server with instrumentation
srv, err := startTestServer(t, ctx, WithAPM(server.URL))
require.NoError(t, err)

newInstrumentationCfg := func(cfg config.Config, instr config.Instrumentation) { //nolint:govet // mutex should not be copied in operation (hopefully)
Expand All @@ -352,12 +493,6 @@ func TestServerInstrumentation(t *testing.T) {
require.NoError(t, srv.srv.Reload(ctx, newCfg))
}

// Enable instrumentation
newInstrumentationCfg(*srv.cfg, config.Instrumentation{ //nolint:govet // mutex should not be copied in operation (hopefully)
Enabled: true,
Hosts: []string{server.URL},
})

stopClient := make(chan struct{})
cli := cleanhttp.DefaultClient()
callCheckinFunc := func() {
Expand Down

0 comments on commit 080d311

Please sign in to comment.