Skip to content

Commit

Permalink
Remove the fleet reporter (#1130)
Browse files Browse the repository at this point in the history
* Remove the fleet reporter

Remove the fleet-reporter so that checkins no longer deliver the event
list.

* add CHANGELOG fix tests
  • Loading branch information
michel-laterman committed Sep 9, 2022
1 parent 802f27e commit 6e2e06c
Show file tree
Hide file tree
Showing 9 changed files with 16 additions and 524 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Expand Up @@ -114,6 +114,7 @@
- Agent updates will clean up unneeded artifacts. {issue}693[693] {issue}694[694] {pull}752[752]
- Use the Elastic Agent configuration directory as the root of the `inputs.d` folder. {issues}663[663]
- Fix a panic caused by a race condition when installing the Elastic Agent. {issues}806[806]
- Remove fleet event reporter and events from checkin body. {issue}993[993]

==== New features

Expand Down
15 changes: 0 additions & 15 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway.go
Expand Up @@ -56,10 +56,6 @@ type agentInfo interface {
AgentID() string
}

type fleetReporter interface {
Events() ([]fleetapi.SerializableEvent, func())
}

type stateStore interface {
Add(fleetapi.Action)
AckToken() string
Expand All @@ -85,7 +81,6 @@ type fleetGateway struct {
backoff backoff.Backoff
settings *fleetGatewaySettings
agentInfo agentInfo
reporter fleetReporter
done chan struct{}
wg sync.WaitGroup
acker store.FleetAcker
Expand All @@ -104,7 +99,6 @@ func New(
agentInfo agentInfo,
client client.Sender,
d pipeline.Dispatcher,
r fleetReporter,
acker store.FleetAcker,
statusController status.Controller,
stateStore stateStore,
Expand All @@ -120,7 +114,6 @@ func New(
client,
d,
scheduler,
r,
acker,
statusController,
stateStore,
Expand All @@ -136,7 +129,6 @@ func newFleetGatewayWithScheduler(
client client.Sender,
d pipeline.Dispatcher,
scheduler scheduler.Scheduler,
r fleetReporter,
acker store.FleetAcker,
statusController status.Controller,
stateStore stateStore,
Expand All @@ -162,7 +154,6 @@ func newFleetGatewayWithScheduler(
settings.Backoff.Max,
),
done: done,
reporter: r,
acker: acker,
statusReporter: statusController.RegisterComponent("gateway"),
statusController: statusController,
Expand Down Expand Up @@ -323,9 +314,6 @@ func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) {
}

func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, error) {
// get events
ee, ack := f.reporter.Events()

ecsMeta, err := info.Metadata()
if err != nil {
f.log.Error(errors.New("failed to load metadata", err))
Expand All @@ -341,7 +329,6 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
cmd := fleetapi.NewCheckinCmd(f.agentInfo, f.client)
req := &fleetapi.CheckinRequest{
AckToken: ackToken,
Events: ee,
Metadata: ecsMeta,
Status: f.statusController.StatusString(),
}
Expand Down Expand Up @@ -374,8 +361,6 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
}
}

// ack events so they are dropped from queue
ack()
return resp, nil
}

Expand Down
54 changes: 3 additions & 51 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go
Expand Up @@ -18,7 +18,6 @@ import (
"testing"
"time"

"github.com/pkg/errors"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

Expand All @@ -29,9 +28,6 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/core/state"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
noopacker "github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/noop"
repo "github.com/elastic/elastic-agent/internal/pkg/reporter"
fleetreporter "github.com/elastic/elastic-agent/internal/pkg/reporter/fleet"
fleetreporterConfig "github.com/elastic/elastic-agent/internal/pkg/reporter/fleet/config"
"github.com/elastic/elastic-agent/internal/pkg/scheduler"
"github.com/elastic/elastic-agent/internal/pkg/testutils"
"github.com/elastic/elastic-agent/pkg/core/logger"
Expand Down Expand Up @@ -137,7 +133,7 @@ func (m *mockQueue) Actions() []fleetapi.Action {
return args.Get(0).([]fleetapi.Action)
}

type withGatewayFunc func(*testing.T, gateway.FleetGateway, *testingClient, *testingDispatcher, *scheduler.Stepper, repo.Backend)
type withGatewayFunc func(*testing.T, gateway.FleetGateway, *testingClient, *testingDispatcher, *scheduler.Stepper)

func withGateway(agentInfo agentInfo, settings *fleetGatewaySettings, fn withGatewayFunc) func(t *testing.T) {
return func(t *testing.T) {
Expand All @@ -146,8 +142,6 @@ func withGateway(agentInfo agentInfo, settings *fleetGatewaySettings, fn withGat
dispatcher := newTestingDispatcher()

log, _ := logger.New("fleet_gateway", false)
rep := getReporter(agentInfo, log, t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -167,7 +161,6 @@ func withGateway(agentInfo agentInfo, settings *fleetGatewaySettings, fn withGat
client,
dispatcher,
scheduler,
rep,
noopacker.NewAcker(),
&noopController{},
stateStore,
Expand All @@ -176,7 +169,7 @@ func withGateway(agentInfo agentInfo, settings *fleetGatewaySettings, fn withGat

require.NoError(t, err)

fn(t, gateway, client, dispatcher, scheduler, rep)
fn(t, gateway, client, dispatcher, scheduler)
}
}

Expand Down Expand Up @@ -214,7 +207,6 @@ func TestFleetGateway(t *testing.T) {
client *testingClient,
dispatcher *testingDispatcher,
scheduler *scheduler.Stepper,
rep repo.Backend,
) {
waitFn := ackSeq(
client.Answer(func(headers http.Header, body io.Reader) (*http.Response, error) {
Expand All @@ -240,7 +232,6 @@ func TestFleetGateway(t *testing.T) {
client *testingClient,
dispatcher *testingDispatcher,
scheduler *scheduler.Stepper,
rep repo.Backend,
) {
waitFn := ackSeq(
client.Answer(func(headers http.Header, body io.Reader) (*http.Response, error) {
Expand Down Expand Up @@ -305,7 +296,6 @@ func TestFleetGateway(t *testing.T) {
client,
dispatcher,
scheduler,
getReporter(agentInfo, log, t),
noopacker.NewAcker(),
&noopController{},
stateStore,
Expand Down Expand Up @@ -366,7 +356,6 @@ func TestFleetGateway(t *testing.T) {
client,
dispatcher,
scheduler,
getReporter(agentInfo, log, t),
noopacker.NewAcker(),
&noopController{},
stateStore,
Expand Down Expand Up @@ -432,7 +421,6 @@ func TestFleetGateway(t *testing.T) {
client,
dispatcher,
scheduler,
getReporter(agentInfo, log, t),
noopacker.NewAcker(),
&noopController{},
stateStore,
Expand Down Expand Up @@ -487,7 +475,6 @@ func TestFleetGateway(t *testing.T) {
client,
dispatcher,
scheduler,
getReporter(agentInfo, log, t),
noopacker.NewAcker(),
&noopController{},
stateStore,
Expand Down Expand Up @@ -544,7 +531,6 @@ func TestFleetGateway(t *testing.T) {
client,
dispatcher,
scheduler,
getReporter(agentInfo, log, t),
noopacker.NewAcker(),
&noopController{},
stateStore,
Expand Down Expand Up @@ -594,9 +580,7 @@ func TestFleetGateway(t *testing.T) {
client *testingClient,
dispatcher *testingDispatcher,
scheduler *scheduler.Stepper,
rep repo.Backend,
) {
_ = rep.Report(context.Background(), &testStateEvent{})
waitFn := ackSeq(
client.Answer(func(headers http.Header, body io.Reader) (*http.Response, error) {
cr := &request{}
Expand All @@ -609,8 +593,6 @@ func TestFleetGateway(t *testing.T) {
t.Fatal(err)
}

require.Equal(t, 1, len(cr.Events))

resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`)
return resp, nil
}),
Expand Down Expand Up @@ -657,7 +639,6 @@ func TestFleetGateway(t *testing.T) {
client,
dispatcher,
scheduler,
getReporter(agentInfo, log, t),
noopacker.NewAcker(),
&noopController{},
stateStore,
Expand Down Expand Up @@ -712,8 +693,6 @@ func TestRetriesOnFailures(t *testing.T) {
client := newTestingClient()
dispatcher := newTestingDispatcher()
log, _ := logger.New("fleet_gateway", false)
rep := getReporter(agentInfo, log, t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -742,7 +721,6 @@ func TestRetriesOnFailures(t *testing.T) {
client,
dispatcher,
scheduler,
rep,
noopacker.NewAcker(),
statusController,
stateStore,
Expand All @@ -757,8 +735,6 @@ func TestRetriesOnFailures(t *testing.T) {
err = gateway.Start()
require.NoError(t, err)

_ = rep.Report(context.Background(), &testStateEvent{})

// Initial tick is done out of bound so we can block on channels.
scheduler.Next()

Expand All @@ -780,8 +756,6 @@ func TestRetriesOnFailures(t *testing.T) {
t.Fatal(err)
}

require.Equal(t, 1, len(cr.Events))

resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`)
return resp, nil
}),
Expand All @@ -807,7 +781,6 @@ func TestRetriesOnFailures(t *testing.T) {
client *testingClient,
dispatcher *testingDispatcher,
scheduler *scheduler.Stepper,
rep repo.Backend,
) {
fail := func(_ http.Header, _ io.Reader) (*http.Response, error) {
return wrapStrToResp(http.StatusInternalServerError, "something is bad"), nil
Expand All @@ -816,8 +789,6 @@ func TestRetriesOnFailures(t *testing.T) {
err := gateway.Start()
require.NoError(t, err)

_ = rep.Report(context.Background(), &testStateEvent{})

// Initial tick is done out of bound so we can block on channels.
scheduler.Next()

Expand All @@ -830,27 +801,8 @@ func TestRetriesOnFailures(t *testing.T) {
}))
}

func getReporter(info agentInfo, log *logger.Logger, t *testing.T) *fleetreporter.Reporter {
fleetR, err := fleetreporter.NewReporter(info, log, fleetreporterConfig.DefaultConfig())
if err != nil {
t.Fatal(errors.Wrap(err, "fail to create reporters"))
}

return fleetR
}

type testAgentInfo struct{}

func (testAgentInfo) AgentID() string { return "agent-secret" }

type testStateEvent struct{}

func (testStateEvent) Type() string { return repo.EventTypeState }
func (testStateEvent) SubType() string { return repo.EventSubTypeInProgress }
func (testStateEvent) Time() time.Time { return time.Unix(0, 1) }
func (testStateEvent) Message() string { return "hello" }
func (testStateEvent) Payload() map[string]interface{} { return map[string]interface{}{"key": 1} }

type request struct {
Events []interface{} `json:"events"`
}
type request struct{}
9 changes: 1 addition & 8 deletions internal/pkg/agent/application/managed_mode.go
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/client"
"github.com/elastic/elastic-agent/internal/pkg/queue"
reporting "github.com/elastic/elastic-agent/internal/pkg/reporter"
fleetreporter "github.com/elastic/elastic-agent/internal/pkg/reporter/fleet"
logreporter "github.com/elastic/elastic-agent/internal/pkg/reporter/log"
"github.com/elastic/elastic-agent/internal/pkg/sorted"
"github.com/elastic/elastic-agent/pkg/core/logger"
Expand Down Expand Up @@ -124,12 +123,7 @@ func newManaged(
}

logR := logreporter.NewReporter(log)
fleetR, err := fleetreporter.NewReporter(agentInfo, log, cfg.Fleet.Reporting)
if err != nil {
return nil, errors.New(err, "fail to create reporters")
}

combinedReporter := reporting.NewReporter(managedApplication.bgContext, log, agentInfo, logR, fleetR)
combinedReporter := reporting.NewReporter(managedApplication.bgContext, log, agentInfo, logR)
monitor, err := monitoring.NewMonitor(cfg.Settings)
if err != nil {
return nil, errors.New(err, "failed to initialize monitoring")
Expand Down Expand Up @@ -288,7 +282,6 @@ func newManaged(
agentInfo,
client,
actionDispatcher,
fleetR,
actionAcker,
statusCtrl,
stateStore,
Expand Down
19 changes: 8 additions & 11 deletions internal/pkg/agent/configuration/fleet.go
Expand Up @@ -7,18 +7,16 @@ package configuration
import (
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/remote"
fleetreporterConfig "github.com/elastic/elastic-agent/internal/pkg/reporter/fleet/config"
)

// FleetAgentConfig is the internal configuration of the agent after the enrollment is done,
// this configuration is not exposed in anyway in the elastic-agent.yml and is only internal configuration.
type FleetAgentConfig struct {
Enabled bool `config:"enabled" yaml:"enabled"`
AccessAPIKey string `config:"access_api_key" yaml:"access_api_key"`
Client remote.Config `config:",inline" yaml:",inline"`
Reporting *fleetreporterConfig.Config `config:"reporting" yaml:"reporting"`
Info *AgentInfo `config:"agent" yaml:"agent"`
Server *FleetServerConfig `config:"server" yaml:"server,omitempty"`
Enabled bool `config:"enabled" yaml:"enabled"`
AccessAPIKey string `config:"access_api_key" yaml:"access_api_key"`
Client remote.Config `config:",inline" yaml:",inline"`
Info *AgentInfo `config:"agent" yaml:"agent"`
Server *FleetServerConfig `config:"server" yaml:"server,omitempty"`
}

// Valid validates the required fields for accessing the API.
Expand All @@ -44,9 +42,8 @@ func (e *FleetAgentConfig) Valid() error {
// DefaultFleetAgentConfig creates a default configuration for fleet.
func DefaultFleetAgentConfig() *FleetAgentConfig {
return &FleetAgentConfig{
Enabled: false,
Client: remote.DefaultClientConfig(),
Reporting: fleetreporterConfig.DefaultConfig(),
Info: &AgentInfo{},
Enabled: false,
Client: remote.DefaultClientConfig(),
Info: &AgentInfo{},
}
}
7 changes: 3 additions & 4 deletions internal/pkg/fleetapi/checkin_cmd.go
Expand Up @@ -22,10 +22,9 @@ const checkingPath = "/api/fleet/agents/%s/checkin"

// CheckinRequest consists of multiple events reported to fleet ui.
type CheckinRequest struct {
Status string `json:"status"`
AckToken string `json:"ack_token,omitempty"`
Events []SerializableEvent `json:"events"`
Metadata *info.ECSMeta `json:"local_metadata,omitempty"`
Status string `json:"status"`
AckToken string `json:"ack_token,omitempty"`
Metadata *info.ECSMeta `json:"local_metadata,omitempty"`
}

// SerializableEvent is a representation of the event to be send to the Fleet Server API via the checkin
Expand Down

0 comments on commit 6e2e06c

Please sign in to comment.