Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,4 @@ fleet.yml.old
pkg/component/fake/component/component
internal/pkg/agent/install/testblocking/testblocking
internal/pkg/otel/manager/testing/testing
pkg/core/process/testsignal/testsignal
3,206 changes: 1,165 additions & 2,041 deletions NOTICE-fips.txt

Large diffs are not rendered by default.

2,435 changes: 900 additions & 1,535 deletions NOTICE.txt

Large diffs are not rendered by default.

569 changes: 283 additions & 286 deletions go.mod

Large diffs are not rendered by default.

1,218 changes: 604 additions & 614 deletions go.sum

Large diffs are not rendered by default.

565 changes: 280 additions & 285 deletions internal/edot/go.mod

Large diffs are not rendered by default.

1,215 changes: 602 additions & 613 deletions internal/edot/go.sum

Large diffs are not rendered by default.

126 changes: 63 additions & 63 deletions internal/pkg/otel/README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion magefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -2258,7 +2258,7 @@ func (Integration) Local(ctx context.Context, testName string) error {
// run the integration tests but only run test that can run locally
params := devtools.DefaultGoTestIntegrationArgs()
params.Tags = append(params.Tags, "local")
params.Packages = []string{"github.com/elastic/elastic-agent/testing/integration"}
params.Packages = []string{"github.com/elastic/elastic-agent/testing/integration/..."}

var goTestFlags []string
rawTestFlags := os.Getenv("GOTEST_FLAGS")
Expand Down
90 changes: 90 additions & 0 deletions pkg/testing/fs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package testing

import (
"os"
"path/filepath"
"runtime"
"strings"
"testing"
"unicode"
"unicode/utf8"
)

// TempDir creates a temporary directory that will be
// removed if the tests passes. The temporary directory is
// created by joining all elements from path, with the sanitised
// test name.
//
// If path is empty, the temporary directory is created in os.TempDir.
//
// When tests are run with -v, the temporary directory absolute
// path will be logged.
func TempDir(t *testing.T, path ...string) string {
rootDir := filepath.Join(path...)

if rootDir == "" {
rootDir = os.TempDir()
}

rootDir, err := filepath.Abs(rootDir)
if err != nil {
t.Fatalf("cannot get absolute path: %s", err)
}

// Logic copied with small modifications from
// the Go source code: testing/testing.go
folderName := t.Name()
mapper := func(r rune) rune {
if r < utf8.RuneSelf {
const allowed = "_-"
if '0' <= r && r <= '9' ||
'a' <= r && r <= 'z' ||
'A' <= r && r <= 'Z' {
return r
}
if strings.ContainsRune(allowed, r) {
return r
}
} else if unicode.IsLetter(r) || unicode.IsNumber(r) {
return r
}
return -1
}
folderName = strings.Map(mapper, folderName)

if err := os.MkdirAll(rootDir, 0o750); err != nil {
t.Fatalf("error making test dir: %s: %s", rootDir, err)
}

tempDir, err := os.MkdirTemp(rootDir, folderName)
if err != nil {
t.Fatalf("failed to make temp directory: %s", err)
}

cleanup := func() {
if !t.Failed() {
if err := os.RemoveAll(tempDir); err != nil {
// Ungly workaround Windows limitations
// Windows does not support the Interrup signal, so it might
// happen that Filebeat is still running, keeping it's registry
// file open, thus preventing the temporary folder from being
// removed. So we log the error and move on without failing the
// test
if runtime.GOOS == "windows" {
t.Logf("[WARN] Could not remove temporatry directory '%s': %s", tempDir, err)
} else {
t.Errorf("could not remove temp dir '%s': %s", tempDir, err)
}
}
} else {
t.Logf("Temporary directory saved: %s", tempDir)
}
}
t.Cleanup(cleanup)

return tempDir
}
144 changes: 73 additions & 71 deletions testing/integration/ess/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,10 +975,12 @@ exporters:
- {{.ESEndpoint}}
api_key: {{.ESApiKey}}
logs_index: {{.Index}}
batcher:
enabled: true
flush_timeout: 1s
min_size: {{.MinItems}}
sending_queue:
wait_for_result: true # Avoid losing data on shutdown
block_on_overflow: true
batch:
flush_timeout: 1s
min_size: {{.MinItems}}
mapping:
mode: bodymap
service:
Expand Down Expand Up @@ -1398,10 +1400,12 @@ exporters:
- {{.ESEndpoint}}
api_key: {{.ESApiKey}}
logs_index: {{.Index}}
batcher:
enabled: true
flush_timeout: 1s
min_size: {{.MinItems}}
sending_queue:
wait_for_result: true # Avoid losing data on shutdown
block_on_overflow: true
batch:
flush_timeout: 1s
min_size: {{.MinItems}}
mapping:
mode: bodymap
service:
Expand Down Expand Up @@ -1590,9 +1594,11 @@ exporters:
compression: none
api_key: {{.ESApiKey}}
logs_index: {{.FBReceiverIndex}}
batcher:
enabled: true
flush_timeout: 1s
sending_queue:
wait_for_result: true # Avoid losing data on shutdown
block_on_overflow: true
batch:
flush_timeout: 1s
mapping:
mode: bodymap
service:
Expand Down Expand Up @@ -1772,7 +1778,8 @@ func TestFBOtelRestartE2E(t *testing.T) {
},
Stack: &define.Stack{},
})
tmpDir := t.TempDir()

tmpDir := aTesting.TempDir(t, "..", "..", "..", "build")

inputFile, err := os.CreateTemp(tmpDir, "input.txt")
require.NoError(t, err, "failed to create temp file to hold data to ingest")
Expand Down Expand Up @@ -1825,9 +1832,11 @@ exporters:
- {{.ESEndpoint}}
api_key: {{.ESApiKey}}
logs_index: {{.Index}}
batcher:
enabled: true
flush_timeout: 1s
sending_queue:
wait_for_result: true # Avoid losing data on shutdown
block_on_overflow: true
batch:
flush_timeout: 1s
mapping:
mode: bodymap
logs_dynamic_id:
Expand All @@ -1840,6 +1849,16 @@ service:
exporters:
- elasticsearch/log
#- debug
telemetry:
logs:
level: DEBUG
encoding: json
disable_stacktrace: true
# Save the logs in a file that is kept if the test fails
output_paths:
- {{.HomeDir}}/elastic-agent-logs.ndjson
error_output_paths:
- {{.HomeDir}}/elastic-agent-error-logs.ndjosn
`
otelConfigPath := filepath.Join(tmpDir, "otel.yml")
var otelConfigBuffer bytes.Buffer
Expand All @@ -1853,16 +1872,6 @@ service:
Index: index,
}))
require.NoError(t, os.WriteFile(otelConfigPath, otelConfigBuffer.Bytes(), 0o600))
t.Cleanup(func() {
if t.Failed() {
contents, err := os.ReadFile(otelConfigPath)
if err != nil {
t.Logf("No otel configuration file at %s", otelConfigPath)
return
}
t.Logf("Contents of otel config file:\n%s\n", string(contents))
}
})

fixture, err := define.NewFixtureFromLocalBuild(t, define.Version(), aTesting.WithAdditionalArgs([]string{"--config", otelConfigPath}))
require.NoError(t, err)
Expand Down Expand Up @@ -1892,17 +1901,6 @@ service:
assert.NoError(t, err, "failed to close input file")
}()

t.Cleanup(func() {
if t.Failed() {
contents, err := os.ReadFile(inputFilePath)
if err != nil {
t.Logf("no data file to import at %s", inputFilePath)
return
}
t.Logf("contents of import file:\n%s\n", string(contents))
}
})

// Start the collector, ingest some logs and then stop it
stoppedCh := make(chan int, 1)
fCtx, cancel := context.WithDeadline(ctx, time.Now().Add(1*time.Minute))
Expand All @@ -1915,19 +1913,23 @@ service:
close(stoppedCh)
}()

// Make sure we ingested at least 10 logs before stopping the collector
var hits int
require.Eventually(t, func() bool {
findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer findCancel()
require.EventuallyWithT(
t,
func(t *assert.CollectT) {
findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer findCancel()

docs, err := estools.GetLogsForIndexWithContext(findCtx, info.ESClient, ".ds-"+index+"*", map[string]interface{}{
"log.file.path": inputFilePath,
})
require.NoError(t, err)
hits += int(docs.Hits.Total.Value)
return hits >= 10
}, 1*time.Minute, 1*time.Second, "Expected to ingest at least 10 logs, got %d", hits)
docs, err := estools.GetLogsForIndexWithContext(findCtx, info.ESClient, ".ds-"+index+"*", map[string]any{
"log.file.path": inputFilePath,
})
require.NoError(t, err)
got := int(docs.Hits.Total.Value)

require.GreaterOrEqual(t, got, 10, "")
},
time.Minute,
time.Second,
"Expecting to ingest at least 10 logs")
cancel()

select {
Expand All @@ -1948,38 +1950,34 @@ service:
err = fixture.RunOtelWithClient(fCtx)
}()

// Make sure all the logs are ingested
actualHits := &struct {
Hits int
UniqueHits int
}{}
require.Eventually(t,
func() bool {
require.EventuallyWithT(
t,
func(t *assert.CollectT) {
findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer findCancel()

docs, err := estools.GetLogsForIndexWithContext(findCtx, info.ESClient, ".ds-"+index+"*", map[string]interface{}{
docs, err := estools.GetLogsForIndexWithContext(findCtx, info.ESClient, ".ds-"+index+"*", map[string]any{
"log.file.path": inputFilePath,
})
require.NoError(t, err)

actualHits.Hits = docs.Hits.Total.Value

uniqueIngestedLogs := make(map[string]struct{})
for _, hit := range docs.Hits.Hits {
t.Log("Hit: ", hit.Source["message"])
message, found := hit.Source["message"]
require.True(t, found, "expected message field in document %q", hit.Source)
msg, ok := message.(string)
require.True(t, ok, "expected message field to be a string, got %T", message)
require.NotContainsf(t, uniqueIngestedLogs, msg, "found duplicated log message %q", msg)
uniqueIngestedLogs[msg] = struct{}{}
}
actualHits.UniqueHits = len(uniqueIngestedLogs)
return actualHits.UniqueHits == int(inputLinesCounter.Load())

want := inputLinesCounter.Load()
got := docs.Hits.Total.Value
require.EqualValues(t, want, got, "expecting %d hits got %d hits", want, got)
},
20*time.Second, 1*time.Second,
"Expected %d logs, got %v", int(inputLinesCounter.Load()), actualHits)
20*time.Second,
time.Second,
"Did not find the expected number of logs")

cancel()
fixtureWg.Wait()
Expand Down Expand Up @@ -2062,10 +2060,12 @@ exporters:
- {{.ESEndpoint}}
api_key: {{.ESApiKey}}
logs_index: {{.Index}}
batcher:
enabled: true
flush_timeout: 1s
min_size: 1
sending_queue:
wait_for_result: true # Avoid losing data on shutdown
block_on_overflow: true
batch:
flush_timeout: 1s
min_size: 1
tls:
ca_file: {{ .CAFile }}
auth:
Expand Down Expand Up @@ -2208,10 +2208,12 @@ exporters:
- {{.ESEndpoint}}
api_key: {{.ESApiKey}}
logs_index: {{.Index}}
batcher:
enabled: true
flush_timeout: 1s
min_size: 1
sending_queue:
wait_for_result: true # Avoid losing data on shutdown
block_on_overflow: true
batch:
flush_timeout: 1s
min_size: 1
auth:
authenticator: beatsauth
mapping:
Expand Down
2 changes: 1 addition & 1 deletion wrapper/windows/archive-proxy/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ go 1.24.10

require github.com/elastic/elastic-agent v0.0.0

require golang.org/x/sys v0.36.0 // indirect
require golang.org/x/sys v0.38.0 // indirect

replace github.com/elastic/elastic-agent => ../../../
4 changes: 2 additions & 2 deletions wrapper/windows/archive-proxy/go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k=
golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc=
golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=