Skip to content

Commit

Permalink
Fix drop processor for monitoring components
Browse files Browse the repository at this point in the history
It fixes the drop processor for monitoring component logs, instead
of using the dataset that does not include any information about
whether the component is a monitoring component it now uses the
`component.id`.
  • Loading branch information
belimawr committed Jul 5, 2023
1 parent 7e760c6 commit ad0ca46
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
kind: bug-fix

summary: Fix drop processor for monitoring components
description: >-
It fixes the drop processor for monitoring component logs, instead
of using the dataset that does not include any information about
whether the component is a monitoring component it now uses the
`component.id`.
component: elastic-agent

pr: https://github.com/elastic/elastic-agent/pull/2982

issue: https://github.com/elastic/elastic-agent/issues/2388
32 changes: 11 additions & 21 deletions internal/pkg/agent/application/monitoring/v1_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,32 +324,21 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components []
},
},
},
"processors": []interface{}{
"processors": []any{
// drop all events from monitoring components (do it early)
// without dropping these events the filestream gets stuck in an infinite loop
// if filestream hits an issue publishing the events it logs an error which then filestream monitor
// will read from the logs and try to also publish that new log message (thus the infinite loop)
// will read from the logs and try to also publish that new log message (thus the infinite loop).
// The only way to identify a monitoring component by looking
// at their ID. They all end in `-monitoring`, e.g:
// - "beat/metrics-monitoring"
// - "filestream-monitoring"
// - "http/metrics-monitoring"
map[string]interface{}{
"drop_event": map[string]interface{}{
"when": map[string]interface{}{
"or": []interface{}{
map[string]interface{}{
"equals": map[string]interface{}{
"component.dataset": fmt.Sprintf("elastic_agent.filestream_%s", monitoringOutput),
},
},
// for consistency this monitor is also not shipped (fetch-able with diagnostics)
map[string]interface{}{
"equals": map[string]interface{}{
"component.dataset": fmt.Sprintf("elastic_agent.beats_metrics_%s", monitoringOutput),
},
},
// for consistency with this monitor is also not shipped (fetch-able with diagnostics)
map[string]interface{}{
"equals": map[string]interface{}{
"component.dataset": fmt.Sprintf("elastic_agent.http_metrics_%s", monitoringOutput),
},
},
"regexp": map[string]interface{}{
"component.id": ".*-monitoring$",
},
},
},
Expand Down Expand Up @@ -432,7 +421,8 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components []
"add_formatted_index": map[string]interface{}{
"index": "%{[data_stream.type]}-%{[data_stream.dataset]}-%{[data_stream.namespace]}",
},
}},
},
},
},
}

Expand Down
35 changes: 35 additions & 0 deletions pkg/testing/tools/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,41 @@ func GetLogsForDatastream(client elastictransport.Interface, index string) (Docu
return GetLogsForDatastreamWithContext(context.Background(), client, index)
}

// GetLogsForAgentID returns any logs associated with the agent ID
func GetLogsForAgentID(client elastictransport.Interface, id string) (Documents, error) {
indexQuery := map[string]interface{}{
"query": map[string]interface{}{
"match": map[string]interface{}{
"data_stream.dataset": "elastic_agent.*",
},
},
}

var buf bytes.Buffer
err := json.NewEncoder(&buf).Encode(indexQuery)
if err != nil {
return Documents{}, fmt.Errorf("error creating ES query: %w", err)
}

es := esapi.New(client)
res, err := es.Search(
es.Search.WithIndex("*.ds-logs*"),
es.Search.WithExpandWildcards("all"),
es.Search.WithBody(&buf),
es.Search.WithTrackTotalHits(true),
es.Search.WithPretty(),
es.Search.WithContext(context.Background()),
es.Search.WithQuery(fmt.Sprintf(`elastic_agent.id:%s`, id)),
// magic number, we try to get all entries it helps debugging test failures
es.Search.WithSize(300),
)
if err != nil {
return Documents{}, fmt.Errorf("error performing ES search: %w", err)
}

return handleDocsResponse(res)
}

// GetLogsForDatastreamWithContext returns any logs associated with the datastream
func GetLogsForDatastreamWithContext(ctx context.Context, client elastictransport.Interface, index string) (Documents, error) {
indexQuery := map[string]interface{}{
Expand Down
104 changes: 100 additions & 4 deletions testing/integration/enroll_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,20 @@
package integration

import (
"encoding/json"
"fmt"
"os"
"regexp"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/elastic/elastic-agent-libs/kibana"
atesting "github.com/elastic/elastic-agent/pkg/testing"
"github.com/elastic/elastic-agent/pkg/testing/define"
"github.com/elastic/elastic-agent/pkg/testing/tools"

"github.com/stretchr/testify/require"
)

func TestEnrollAndLog(t *testing.T) {
Expand All @@ -27,7 +29,7 @@ func TestEnrollAndLog(t *testing.T) {
{Type: define.Linux},
},
Stack: &define.Stack{},
Local: false,
Local: true,
Sudo: true,
})
t.Logf("got namespace: %s", info.Namespace)
Expand All @@ -47,8 +49,84 @@ func (runner *EnrollRunner) SetupSuite() {
require.NoError(runner.T(), err)
}

func (runner *EnrollRunner) SetupTest() {
func (runner *EnrollRunner) SetupTest() {}

// TestDropMonitoringLogs ensures logs from the monitoring components are not
// sent to the output
func (runner *EnrollRunner) TestDropMonitoringLogs() {
t := runner.T()
t.Logf("In TestDropMonitoringLogs")

defineInfo := runner.requirementsInfo
kibClient := runner.requirementsInfo.KibanaClient

// Enroll agent in Fleet with a test policy
createPolicyReq := kibana.AgentPolicy{
Name: fmt.Sprintf("test-monitoring-logs-%d", time.Now().Unix()),
Namespace: "testdropmonitoringlogs",
Description: "test policy for drop processors",
MonitoringEnabled: []kibana.MonitoringEnabledOption{
kibana.MonitoringEnabledLogs,
kibana.MonitoringEnabledMetrics,
},
AgentFeatures: []map[string]interface{}{
{
"name": t.Name(),
"enabled": true,
},
},
}

// As part of the cleanup process, we'll uninstall the agent
policy, err := tools.InstallAgentWithPolicy(t, runner.agentFixture, kibClient, createPolicyReq)
require.NoError(t, err, "could not install Elastic Agent with Policy")
t.Logf("created policy: %s", policy.ID)

t.Cleanup(func() {
require.NoError(t, tools.UnEnrollAgent(kibClient), "could not un-enroll Elastic-Agent")
})

t.Log("waiting 20s so the components can generate some logs and" +
"Filebeat can collect them")
time.Sleep(20 * time.Second)
t.Log("Done sleeping")

hostname, err := os.Hostname()
if err != nil {
t.Fatalf("could not get hostname to filter Agent: %s", err)
}

agentID, err := tools.GetAgentIDByHostname(defineInfo.KibanaClient, hostname)

Check failure on line 99 in testing/integration/enroll_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

ineffectual assignment to err (ineffassign)
t.Logf("Agent ID: %q", agentID)

// We cannot search for `component.id` because at the moment of writing
// this field is not mapped. There is an issue for that:
// https://github.com/elastic/integrations/issues/6545
docs, err := tools.GetLogsForAgentID(defineInfo.ESClient, agentID)
require.NoError(t, err, "could not get logs from Agent ID: %q, err: %s",
agentID, err)

monRegExp := regexp.MustCompile(".*-monitoring$")
for i, d := range docs.Hits.Hits {
// Lazy way to navigate a map[string]any: convert to JSON then
// decode into a struct.
jsonData, err := json.Marshal(d.Source)
if err != nil {
t.Fatalf("could not encode document source as JSON: %s", err)
}

doc := ESDocument{}
if err := json.Unmarshal(jsonData, &doc); err != nil {
t.Fatalf("could not unmarshal document source: %s", err)
}

if monRegExp.MatchString(doc.Component.ID) {
t.Errorf("[%d] Document on index %q with 'component.id': %q "+
"and 'elastic_agent.id': %q. 'elastic_agent.id' must not "+
"end in '-monitoring'\n",
i, d.Index, doc.Component.ID, doc.ElasticAgent.ID)
}
}
}

func (runner *EnrollRunner) TestEnroll() {
Expand Down Expand Up @@ -129,3 +207,21 @@ func (runner *EnrollRunner) TestEnroll() {
require.NotZero(runner.T(), len(docs.Hits.Hits))

}

type ESDocument struct {
ElasticAgent ElasticAgent `json:"elastic_agent"`
Component Component `json:"component"`
Host Host `json:"host"`
}
type ElasticAgent struct {
ID string `json:"id"`
Version string `json:"version"`
Snapshot bool `json:"snapshot"`
}
type Component struct {
Binary string `json:"binary"`
ID string `json:"id"`
}
type Host struct {
Hostname string `json:"hostname"`
}

0 comments on commit ad0ca46

Please sign in to comment.