Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix drop processor for monitoring events #2982

Merged
merged 5 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
kind: bug-fix

summary: Fix drop processor for monitoring components
description: >-
It fixes the drop processor for monitoring component logs, instead
of using the dataset that does not include any information about
whether the component is a monitoring component it now uses the
`component.id`.
component: elastic-agent

pr: https://github.com/elastic/elastic-agent/pull/2982

issue: https://github.com/elastic/elastic-agent/issues/2388
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is an extra changelog fragment... I don't see the related change and I think it's already been merged, right ?

Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
kind: enhancement

summary: mage `integration:local` now accepts test name to run

description: >-
The `mage integration:local` command now supports as an optional
argument the name of a test to run. This test name will be passed to
`go test` as `--run=<test name>`. If no argument is passed, `mage
integration:local` will run all local tests.

component: elastic-agent

pr: https://github.com/elastic/elastic-agent/pull/2993
29 changes: 9 additions & 20 deletions internal/pkg/agent/application/monitoring/v1_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,32 +322,21 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components []
},
},
},
"processors": []interface{}{
"processors": []any{
// drop all events from monitoring components (do it early)
// without dropping these events the filestream gets stuck in an infinite loop
// if filestream hits an issue publishing the events it logs an error which then filestream monitor
// will read from the logs and try to also publish that new log message (thus the infinite loop)
// will read from the logs and try to also publish that new log message (thus the infinite loop).
// The only way to identify a monitoring component by looking
// at their ID. They all end in `-monitoring`, e.g:
// - "beat/metrics-monitoring"
// - "filestream-monitoring"
// - "http/metrics-monitoring"
map[string]interface{}{
"drop_event": map[string]interface{}{
"when": map[string]interface{}{
"or": []interface{}{
map[string]interface{}{
"equals": map[string]interface{}{
"component.dataset": fmt.Sprintf("elastic_agent.filestream_%s", monitoringOutput),
},
},
// for consistency this monitor is also not shipped (fetch-able with diagnostics)
map[string]interface{}{
"equals": map[string]interface{}{
"component.dataset": fmt.Sprintf("elastic_agent.beats_metrics_%s", monitoringOutput),
},
},
// for consistency with this monitor is also not shipped (fetch-able with diagnostics)
map[string]interface{}{
"equals": map[string]interface{}{
"component.dataset": fmt.Sprintf("elastic_agent.http_metrics_%s", monitoringOutput),
},
},
"regexp": map[string]interface{}{
"component.id": ".*-monitoring$",
},
},
},
Expand Down
35 changes: 35 additions & 0 deletions pkg/testing/tools/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,41 @@ func GetLogsForDatastream(client elastictransport.Interface, index string) (Docu
return GetLogsForDatastreamWithContext(context.Background(), client, index)
}

// GetLogsForAgentID returns any logs associated with the agent ID
func GetLogsForAgentID(client elastictransport.Interface, id string) (Documents, error) {
indexQuery := map[string]interface{}{
"query": map[string]interface{}{
"match": map[string]interface{}{
"data_stream.dataset": "elastic_agent.*",
},
},
}

var buf bytes.Buffer
err := json.NewEncoder(&buf).Encode(indexQuery)
if err != nil {
return Documents{}, fmt.Errorf("error creating ES query: %w", err)
}

es := esapi.New(client)
res, err := es.Search(
es.Search.WithIndex("*.ds-logs*"),
es.Search.WithExpandWildcards("all"),
es.Search.WithBody(&buf),
es.Search.WithTrackTotalHits(true),
es.Search.WithPretty(),
es.Search.WithContext(context.Background()),
es.Search.WithQuery(fmt.Sprintf(`elastic_agent.id:%s`, id)),
// magic number, we try to get all entries it helps debugging test failures
es.Search.WithSize(300),
)
if err != nil {
return Documents{}, fmt.Errorf("error performing ES search: %w", err)
}

return handleDocsResponse(res)
}

// GetLogsForDatastreamWithContext returns any logs associated with the datastream
func GetLogsForDatastreamWithContext(ctx context.Context, client elastictransport.Interface, index string) (Documents, error) {
indexQuery := map[string]interface{}{
Expand Down
201 changes: 159 additions & 42 deletions testing/integration/enroll_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,20 @@
package integration

import (
"encoding/json"
"fmt"
"os"
"regexp"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/elastic/elastic-agent-libs/kibana"
atesting "github.com/elastic/elastic-agent/pkg/testing"
"github.com/elastic/elastic-agent/pkg/testing/define"
"github.com/elastic/elastic-agent/pkg/testing/tools"

"github.com/stretchr/testify/require"
)

func TestEnrollAndLog(t *testing.T) {
Expand All @@ -28,17 +33,110 @@ func TestEnrollAndLog(t *testing.T) {
Sudo: true,
})
t.Logf("got namespace: %s", info.Namespace)
suite.Run(t, &EnrollRunner{requirementsInfo: info})
}

type EnrollRunner struct {
suite.Suite
requirementsInfo *define.Info
agentFixture *atesting.Fixture
}

func (runner *EnrollRunner) SetupSuite() {
runner.T().Logf("In SetupSuite")
agentFixture, err := define.NewFixture(runner.T(), define.Version())
runner.agentFixture = agentFixture
require.NoError(runner.T(), err)
}

t.Logf("In SetupSuite")
agentFixture, err := define.NewFixture(t, define.Version())
require.NoError(t, err)
func (runner *EnrollRunner) SetupTest() {}

// TestDropMonitoringLogs ensures logs from the monitoring components are not
// sent to the output
func (runner *EnrollRunner) TestDropMonitoringLogs() {
t := runner.T()
t.Logf("In TestDropMonitoringLogs")

defineInfo := runner.requirementsInfo
kibClient := runner.requirementsInfo.KibanaClient

t.Logf("In TestEnroll")
kibClient := info.KibanaClient
// Enroll agent in Fleet with a test policy
createPolicyReq := kibana.AgentPolicy{
Name: fmt.Sprintf("test-monitoring-logs-%d", time.Now().Unix()),
Namespace: "testdropmonitoringlogs",
Description: "test policy for drop processors",
MonitoringEnabled: []kibana.MonitoringEnabledOption{
kibana.MonitoringEnabledLogs,
kibana.MonitoringEnabledMetrics,
},
AgentFeatures: []map[string]interface{}{
{
"name": t.Name(),
"enabled": true,
},
},
}

// As part of the cleanup process, we'll uninstall the agent
policy, err := tools.InstallAgentWithPolicy(t, runner.agentFixture, kibClient, createPolicyReq)
require.NoError(t, err, "could not install Elastic Agent with Policy")
t.Logf("created policy: %s", policy.ID)

t.Cleanup(func() {
require.NoError(t, tools.UnEnrollAgent(kibClient), "could not un-enroll Elastic-Agent")
})

t.Log("waiting 20s so the components can generate some logs and" +
"Filebeat can collect them")
time.Sleep(20 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not blocking but do we have an option besides Sleep? Any chance we could generate and read some kind of pre and post sentinel logs where we know a monitoring event would have been sent between the 2 of them?

t.Log("Done sleeping")

hostname, err := os.Hostname()
if err != nil {
t.Fatalf("could not get hostname to filter Agent: %s", err)
}

agentID, err := tools.GetAgentIDByHostname(defineInfo.KibanaClient, hostname)
require.NoError(t, err, "could not get Agent ID by hostname")
t.Logf("Agent ID: %q", agentID)

// We cannot search for `component.id` because at the moment of writing
// this field is not mapped. There is an issue for that:
// https://github.com/elastic/integrations/issues/6545
docs, err := tools.GetLogsForAgentID(defineInfo.ESClient, agentID)
require.NoError(t, err, "could not get logs from Agent ID: %q, err: %s",
agentID, err)

monRegExp := regexp.MustCompile(".*-monitoring$")
for i, d := range docs.Hits.Hits {
// Lazy way to navigate a map[string]any: convert to JSON then
// decode into a struct.
jsonData, err := json.Marshal(d.Source)
if err != nil {
t.Fatalf("could not encode document source as JSON: %s", err)
}

doc := ESDocument{}
if err := json.Unmarshal(jsonData, &doc); err != nil {
t.Fatalf("could not unmarshal document source: %s", err)
}

if monRegExp.MatchString(doc.Component.ID) {
t.Errorf("[%d] Document on index %q with 'component.id': %q "+
"and 'elastic_agent.id': %q. 'elastic_agent.id' must not "+
"end in '-monitoring'\n",
i, d.Index, doc.Component.ID, doc.ElasticAgent.ID)
}
}
}

func (runner *EnrollRunner) TestEnroll() {
runner.T().Logf("In TestEnroll")
kibClient := runner.requirementsInfo.KibanaClient
// Enroll agent in Fleet with a test policy
createPolicyReq := kibana.AgentPolicy{
Name: fmt.Sprintf("test-policy-enroll-%d", time.Now().Unix()),
Namespace: info.Namespace,
Namespace: "enrolltest",
Description: "test policy for agent enrollment",
MonitoringEnabled: []kibana.MonitoringEnabledOption{
kibana.MonitoringEnabledLogs,
Expand All @@ -53,59 +151,78 @@ func TestEnrollAndLog(t *testing.T) {
}
// Stage 1: Install
// As part of the cleanup process, we'll uninstall the agent
policy, err := tools.InstallAgentWithPolicy(t, agentFixture, kibClient, createPolicyReq)
require.NoError(t, err)
t.Logf("created policy: %s", policy.ID)

t.Cleanup(func() {
// After: unenroll
err = tools.UnEnrollAgent(info.KibanaClient)
require.NoError(t, err)
policy, err := tools.InstallAgentWithPolicy(runner.T(), runner.agentFixture, kibClient, createPolicyReq)
require.NoError(runner.T(), err)
runner.T().Logf("created policy: %s", policy.ID)

runner.T().Cleanup(func() {
//After: unenroll
err = tools.UnEnrollAgent(runner.requirementsInfo.KibanaClient)
require.NoError(runner.T(), err)
})

t.Logf("sleeping for one minute...")
runner.T().Logf("sleeping for one minute...")
time.Sleep(time.Second * 60)

// Stage 2: check indicies
// This is mostly for debugging
resp, err := tools.GetAllindicies(info.ESClient)
require.NoError(t, err)
resp, err := tools.GetAllindicies(runner.requirementsInfo.ESClient)
require.NoError(runner.T(), err)
for _, run := range resp {
t.Logf("%s: %d/%d deleted: %d\n", run.Index, run.DocsCount, run.StoreSizeBytes, run.DocsDeleted)
runner.T().Logf("%s: %d/%d deleted: %d\n", run.Index, run.DocsCount, run.StoreSizeBytes, run.DocsDeleted)
}

// Stage 3: Make sure metricbeat logs are populated
docs, err := tools.GetLogsForDatastream(info.ESClient, "elastic_agent.metricbeat")
require.NoError(t, err)
require.NotZero(t, len(docs.Hits.Hits))
t.Logf("metricbeat: Got %d documents", len(docs.Hits.Hits))
docs, err := tools.GetLogsForDatastream(runner.requirementsInfo.ESClient, "elastic_agent.metricbeat")
require.NoError(runner.T(), err)
require.NotZero(runner.T(), len(docs.Hits.Hits))
runner.T().Logf("metricbeat: Got %d documents", len(docs.Hits.Hits))

// Stage 4: Make sure filebeat logs are populated
docs, err = tools.GetLogsForDatastream(info.ESClient, "elastic_agent.filebeat")
require.NoError(t, err)
require.NotZero(t, len(docs.Hits.Hits))
t.Logf("Filebeat: Got %d documents", len(docs.Hits.Hits))
docs, err = tools.GetLogsForDatastream(runner.requirementsInfo.ESClient, "elastic_agent.filebeat")
require.NoError(runner.T(), err)
require.NotZero(runner.T(), len(docs.Hits.Hits))
runner.T().Logf("Filebeat: Got %d documents", len(docs.Hits.Hits))

// Stage 5: make sure we have no errors
docs, err = tools.CheckForErrorsInLogs(info.ESClient, []string{})
require.NoError(t, err)
t.Logf("errors: Got %d documents", len(docs.Hits.Hits))
docs, err = tools.CheckForErrorsInLogs(runner.requirementsInfo.ESClient, []string{})
require.NoError(runner.T(), err)
runner.T().Logf("errors: Got %d documents", len(docs.Hits.Hits))
for _, doc := range docs.Hits.Hits {
t.Logf("%#v", doc.Source)
runner.T().Logf("%#v", doc.Source)
}
require.Empty(t, docs.Hits.Hits)
require.Empty(runner.T(), docs.Hits.Hits)

// Stage 6: Make sure we have message confirming central management is running
docs, err = tools.FindMatchingLogLines(info.ESClient, "Parsed configuration and determined agent is managed by Fleet")
require.NoError(t, err)
require.NotZero(t, len(docs.Hits.Hits))
docs, err = tools.FindMatchingLogLines(runner.requirementsInfo.ESClient, "Parsed configuration and determined agent is managed by Fleet")
require.NoError(runner.T(), err)
require.NotZero(runner.T(), len(docs.Hits.Hits))

// Stage 7: check for starting messages
docs, err = tools.FindMatchingLogLines(info.ESClient, "metricbeat start running")
require.NoError(t, err)
require.NotZero(t, len(docs.Hits.Hits))
docs, err = tools.FindMatchingLogLines(runner.requirementsInfo.ESClient, "metricbeat start running")
require.NoError(runner.T(), err)
require.NotZero(runner.T(), len(docs.Hits.Hits))

docs, err = tools.FindMatchingLogLines(runner.requirementsInfo.ESClient, "filebeat start running")
require.NoError(runner.T(), err)
require.NotZero(runner.T(), len(docs.Hits.Hits))

docs, err = tools.FindMatchingLogLines(info.ESClient, "filebeat start running")
require.NoError(t, err)
require.NotZero(t, len(docs.Hits.Hits))
}

type ESDocument struct {
ElasticAgent ElasticAgent `json:"elastic_agent"`
Component Component `json:"component"`
Host Host `json:"host"`
}
type ElasticAgent struct {
ID string `json:"id"`
Version string `json:"version"`
Snapshot bool `json:"snapshot"`
}
type Component struct {
Binary string `json:"binary"`
ID string `json:"id"`
}
type Host struct {
Hostname string `json:"hostname"`
}