Skip to content

Commit

Permalink
Fix drop processor for monitoring components
Browse files Browse the repository at this point in the history
It fixes the drop processor for monitoring component logs, instead
of using the dataset that does not include any information about
whether the component is a monitoring component it now uses the
`component.id`.
  • Loading branch information
belimawr committed Jul 7, 2023
1 parent ad4b12d commit 85f208a
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 62 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
kind: bug-fix

summary: Fix drop processor for monitoring components
description: >-
It fixes the drop processor for monitoring component logs, instead
of using the dataset that does not include any information about
whether the component is a monitoring component it now uses the
`component.id`.
component: elastic-agent

pr: https://github.com/elastic/elastic-agent/pull/2982

issue: https://github.com/elastic/elastic-agent/issues/2388
29 changes: 9 additions & 20 deletions internal/pkg/agent/application/monitoring/v1_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,32 +322,21 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components []
},
},
},
"processors": []interface{}{
"processors": []any{
// drop all events from monitoring components (do it early)
// without dropping these events the filestream gets stuck in an infinite loop
// if filestream hits an issue publishing the events it logs an error which then filestream monitor
// will read from the logs and try to also publish that new log message (thus the infinite loop)
// will read from the logs and try to also publish that new log message (thus the infinite loop).
// The only way to identify a monitoring component by looking
// at their ID. They all end in `-monitoring`, e.g:
// - "beat/metrics-monitoring"
// - "filestream-monitoring"
// - "http/metrics-monitoring"
map[string]interface{}{
"drop_event": map[string]interface{}{
"when": map[string]interface{}{
"or": []interface{}{
map[string]interface{}{
"equals": map[string]interface{}{
"component.dataset": fmt.Sprintf("elastic_agent.filestream_%s", monitoringOutput),
},
},
// for consistency this monitor is also not shipped (fetch-able with diagnostics)
map[string]interface{}{
"equals": map[string]interface{}{
"component.dataset": fmt.Sprintf("elastic_agent.beats_metrics_%s", monitoringOutput),
},
},
// for consistency with this monitor is also not shipped (fetch-able with diagnostics)
map[string]interface{}{
"equals": map[string]interface{}{
"component.dataset": fmt.Sprintf("elastic_agent.http_metrics_%s", monitoringOutput),
},
},
"regexp": map[string]interface{}{
"component.id": ".*-monitoring$",
},
},
},
Expand Down
35 changes: 35 additions & 0 deletions pkg/testing/tools/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,41 @@ func GetLogsForDatastream(client elastictransport.Interface, index string) (Docu
return GetLogsForDatastreamWithContext(context.Background(), client, index)
}

// GetLogsForAgentID returns any logs associated with the agent ID
func GetLogsForAgentID(client elastictransport.Interface, id string) (Documents, error) {
indexQuery := map[string]interface{}{
"query": map[string]interface{}{
"match": map[string]interface{}{
"data_stream.dataset": "elastic_agent.*",
},
},
}

var buf bytes.Buffer
err := json.NewEncoder(&buf).Encode(indexQuery)
if err != nil {
return Documents{}, fmt.Errorf("error creating ES query: %w", err)
}

es := esapi.New(client)
res, err := es.Search(
es.Search.WithIndex("*.ds-logs*"),
es.Search.WithExpandWildcards("all"),
es.Search.WithBody(&buf),
es.Search.WithTrackTotalHits(true),
es.Search.WithPretty(),
es.Search.WithContext(context.Background()),
es.Search.WithQuery(fmt.Sprintf(`elastic_agent.id:%s`, id)),
// magic number, we try to get all entries it helps debugging test failures
es.Search.WithSize(300),
)
if err != nil {
return Documents{}, fmt.Errorf("error performing ES search: %w", err)
}

return handleDocsResponse(res)
}

// GetLogsForDatastreamWithContext returns any logs associated with the datastream
func GetLogsForDatastreamWithContext(ctx context.Context, client elastictransport.Interface, index string) (Documents, error) {
indexQuery := map[string]interface{}{
Expand Down
201 changes: 159 additions & 42 deletions testing/integration/enroll_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,20 @@
package integration

import (
"encoding/json"
"fmt"
"os"
"regexp"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/elastic/elastic-agent-libs/kibana"
atesting "github.com/elastic/elastic-agent/pkg/testing"
"github.com/elastic/elastic-agent/pkg/testing/define"
"github.com/elastic/elastic-agent/pkg/testing/tools"

"github.com/stretchr/testify/require"
)

func TestEnrollAndLog(t *testing.T) {
Expand All @@ -28,17 +33,110 @@ func TestEnrollAndLog(t *testing.T) {
Sudo: true,
})
t.Logf("got namespace: %s", info.Namespace)
suite.Run(t, &EnrollRunner{requirementsInfo: info})
}

type EnrollRunner struct {
suite.Suite
requirementsInfo *define.Info
agentFixture *atesting.Fixture
}

func (runner *EnrollRunner) SetupSuite() {
runner.T().Logf("In SetupSuite")
agentFixture, err := define.NewFixture(runner.T(), define.Version())
runner.agentFixture = agentFixture
require.NoError(runner.T(), err)
}

t.Logf("In SetupSuite")
agentFixture, err := define.NewFixture(t, define.Version())
require.NoError(t, err)
func (runner *EnrollRunner) SetupTest() {}

// TestDropMonitoringLogs ensures logs from the monitoring components are not
// sent to the output
func (runner *EnrollRunner) TestDropMonitoringLogs() {
t := runner.T()
t.Logf("In TestDropMonitoringLogs")

defineInfo := runner.requirementsInfo
kibClient := runner.requirementsInfo.KibanaClient

t.Logf("In TestEnroll")
kibClient := info.KibanaClient
// Enroll agent in Fleet with a test policy
createPolicyReq := kibana.AgentPolicy{
Name: fmt.Sprintf("test-monitoring-logs-%d", time.Now().Unix()),
Namespace: "testdropmonitoringlogs",
Description: "test policy for drop processors",
MonitoringEnabled: []kibana.MonitoringEnabledOption{
kibana.MonitoringEnabledLogs,
kibana.MonitoringEnabledMetrics,
},
AgentFeatures: []map[string]interface{}{
{
"name": t.Name(),
"enabled": true,
},
},
}

// As part of the cleanup process, we'll uninstall the agent
policy, err := tools.InstallAgentWithPolicy(t, runner.agentFixture, kibClient, createPolicyReq)
require.NoError(t, err, "could not install Elastic Agent with Policy")
t.Logf("created policy: %s", policy.ID)

t.Cleanup(func() {
require.NoError(t, tools.UnEnrollAgent(kibClient), "could not un-enroll Elastic-Agent")
})

t.Log("waiting 20s so the components can generate some logs and" +
"Filebeat can collect them")
time.Sleep(20 * time.Second)
t.Log("Done sleeping")

hostname, err := os.Hostname()
if err != nil {
t.Fatalf("could not get hostname to filter Agent: %s", err)
}

agentID, err := tools.GetAgentIDByHostname(defineInfo.KibanaClient, hostname)
require.NoError(t, err, "could not get Agent ID by hostname")
t.Logf("Agent ID: %q", agentID)

// We cannot search for `component.id` because at the moment of writing
// this field is not mapped. There is an issue for that:
// https://github.com/elastic/integrations/issues/6545
docs, err := tools.GetLogsForAgentID(defineInfo.ESClient, agentID)
require.NoError(t, err, "could not get logs from Agent ID: %q, err: %s",
agentID, err)

monRegExp := regexp.MustCompile(".*-monitoring$")
for i, d := range docs.Hits.Hits {
// Lazy way to navigate a map[string]any: convert to JSON then
// decode into a struct.
jsonData, err := json.Marshal(d.Source)
if err != nil {
t.Fatalf("could not encode document source as JSON: %s", err)
}

doc := ESDocument{}
if err := json.Unmarshal(jsonData, &doc); err != nil {
t.Fatalf("could not unmarshal document source: %s", err)
}

if monRegExp.MatchString(doc.Component.ID) {
t.Errorf("[%d] Document on index %q with 'component.id': %q "+
"and 'elastic_agent.id': %q. 'elastic_agent.id' must not "+
"end in '-monitoring'\n",
i, d.Index, doc.Component.ID, doc.ElasticAgent.ID)
}
}
}

func (runner *EnrollRunner) TestEnroll() {
runner.T().Logf("In TestEnroll")
kibClient := runner.requirementsInfo.KibanaClient
// Enroll agent in Fleet with a test policy
createPolicyReq := kibana.AgentPolicy{
Name: fmt.Sprintf("test-policy-enroll-%d", time.Now().Unix()),
Namespace: info.Namespace,
Namespace: "enrolltest",
Description: "test policy for agent enrollment",
MonitoringEnabled: []kibana.MonitoringEnabledOption{
kibana.MonitoringEnabledLogs,
Expand All @@ -53,59 +151,78 @@ func TestEnrollAndLog(t *testing.T) {
}
// Stage 1: Install
// As part of the cleanup process, we'll uninstall the agent
policy, err := tools.InstallAgentWithPolicy(t, agentFixture, kibClient, createPolicyReq)
require.NoError(t, err)
t.Logf("created policy: %s", policy.ID)

t.Cleanup(func() {
// After: unenroll
err = tools.UnEnrollAgent(info.KibanaClient)
require.NoError(t, err)
policy, err := tools.InstallAgentWithPolicy(runner.T(), runner.agentFixture, kibClient, createPolicyReq)
require.NoError(runner.T(), err)
runner.T().Logf("created policy: %s", policy.ID)

runner.T().Cleanup(func() {
//After: unenroll
err = tools.UnEnrollAgent(runner.requirementsInfo.KibanaClient)
require.NoError(runner.T(), err)
})

t.Logf("sleeping for one minute...")
runner.T().Logf("sleeping for one minute...")
time.Sleep(time.Second * 60)

// Stage 2: check indicies
// This is mostly for debugging
resp, err := tools.GetAllindicies(info.ESClient)
require.NoError(t, err)
resp, err := tools.GetAllindicies(runner.requirementsInfo.ESClient)
require.NoError(runner.T(), err)
for _, run := range resp {
t.Logf("%s: %d/%d deleted: %d\n", run.Index, run.DocsCount, run.StoreSizeBytes, run.DocsDeleted)
runner.T().Logf("%s: %d/%d deleted: %d\n", run.Index, run.DocsCount, run.StoreSizeBytes, run.DocsDeleted)
}

// Stage 3: Make sure metricbeat logs are populated
docs, err := tools.GetLogsForDatastream(info.ESClient, "elastic_agent.metricbeat")
require.NoError(t, err)
require.NotZero(t, len(docs.Hits.Hits))
t.Logf("metricbeat: Got %d documents", len(docs.Hits.Hits))
docs, err := tools.GetLogsForDatastream(runner.requirementsInfo.ESClient, "elastic_agent.metricbeat")
require.NoError(runner.T(), err)
require.NotZero(runner.T(), len(docs.Hits.Hits))
runner.T().Logf("metricbeat: Got %d documents", len(docs.Hits.Hits))

// Stage 4: Make sure filebeat logs are populated
docs, err = tools.GetLogsForDatastream(info.ESClient, "elastic_agent.filebeat")
require.NoError(t, err)
require.NotZero(t, len(docs.Hits.Hits))
t.Logf("Filebeat: Got %d documents", len(docs.Hits.Hits))
docs, err = tools.GetLogsForDatastream(runner.requirementsInfo.ESClient, "elastic_agent.filebeat")
require.NoError(runner.T(), err)
require.NotZero(runner.T(), len(docs.Hits.Hits))
runner.T().Logf("Filebeat: Got %d documents", len(docs.Hits.Hits))

// Stage 5: make sure we have no errors
docs, err = tools.CheckForErrorsInLogs(info.ESClient, []string{})
require.NoError(t, err)
t.Logf("errors: Got %d documents", len(docs.Hits.Hits))
docs, err = tools.CheckForErrorsInLogs(runner.requirementsInfo.ESClient, []string{})
require.NoError(runner.T(), err)
runner.T().Logf("errors: Got %d documents", len(docs.Hits.Hits))
for _, doc := range docs.Hits.Hits {
t.Logf("%#v", doc.Source)
runner.T().Logf("%#v", doc.Source)
}
require.Empty(t, docs.Hits.Hits)
require.Empty(runner.T(), docs.Hits.Hits)

// Stage 6: Make sure we have message confirming central management is running
docs, err = tools.FindMatchingLogLines(info.ESClient, "Parsed configuration and determined agent is managed by Fleet")
require.NoError(t, err)
require.NotZero(t, len(docs.Hits.Hits))
docs, err = tools.FindMatchingLogLines(runner.requirementsInfo.ESClient, "Parsed configuration and determined agent is managed by Fleet")
require.NoError(runner.T(), err)
require.NotZero(runner.T(), len(docs.Hits.Hits))

// Stage 7: check for starting messages
docs, err = tools.FindMatchingLogLines(info.ESClient, "metricbeat start running")
require.NoError(t, err)
require.NotZero(t, len(docs.Hits.Hits))
docs, err = tools.FindMatchingLogLines(runner.requirementsInfo.ESClient, "metricbeat start running")
require.NoError(runner.T(), err)
require.NotZero(runner.T(), len(docs.Hits.Hits))

docs, err = tools.FindMatchingLogLines(runner.requirementsInfo.ESClient, "filebeat start running")
require.NoError(runner.T(), err)
require.NotZero(runner.T(), len(docs.Hits.Hits))

docs, err = tools.FindMatchingLogLines(info.ESClient, "filebeat start running")
require.NoError(t, err)
require.NotZero(t, len(docs.Hits.Hits))
}

type ESDocument struct {
ElasticAgent ElasticAgent `json:"elastic_agent"`
Component Component `json:"component"`
Host Host `json:"host"`
}
type ElasticAgent struct {
ID string `json:"id"`
Version string `json:"version"`
Snapshot bool `json:"snapshot"`
}
type Component struct {
Binary string `json:"binary"`
ID string `json:"id"`
}
type Host struct {
Hostname string `json:"hostname"`
}

0 comments on commit 85f208a

Please sign in to comment.