Skip to content

Commit

Permalink
Make filestream benchmark more precise and more re-usable (#37334)
Browse files Browse the repository at this point in the history
* Now the initialization time is not included in the actual benchmark
timer
* The filestream runner is more re-usable now and can be used later
for regular tests not just benchmarks
  • Loading branch information
rdner authored Dec 7, 2023
1 parent 609817b commit a8d1567
Showing 1 changed file with 78 additions and 49 deletions.
127 changes: 78 additions & 49 deletions filebeat/input/filestream/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,70 @@ import (
"github.com/elastic/elastic-agent-libs/logp"
)

func BenchmarkFilestream(b *testing.B) {
logp.TestingSetup(logp.ToDiscardOutput())
lineCount := 10000
filename := generateFile(b, lineCount)

b.ResetTimer()

b.Run("filestream default throughput", func(b *testing.B) {
cfg := `
type: filestream
prospector.scanner.check_interval: 1s
paths:
- ` + filename + `
`
for i := 0; i < b.N; i++ {
runFilestreamBenchmark(b, fmt.Sprintf("default-benchmark-%d", i), cfg, lineCount)
}
})

b.Run("filestream fingerprint throughput", func(b *testing.B) {
cfg := `
type: filestream
prospector.scanner:
fingerprint.enabled: true
check_interval: 1s
file_identity.fingerprint: ~
paths:
- ` + filename + `
`
for i := 0; i < b.N; i++ {
runFilestreamBenchmark(b, fmt.Sprintf("fp-benchmark-%d", i), cfg, lineCount)
}
})
}

// runFilestreamBenchmark runs the entire filestream input with the in-memory registry and the test pipeline.
// `testID` must be unique for each test run
// `cfg` must be a valid YAML string containing valid filestream configuration
// `expEventCount` is an expected amount of produced events
func runFilestreamBenchmark(t testing.TB, testID string, cfg string, expEventCount int) {
func runFilestreamBenchmark(b *testing.B, testID string, cfg string, expEventCount int) {
// we don't include initialization in the benchmark time
b.StopTimer()
runner := createFilestreamTestRunner(b, testID, cfg, expEventCount)
// this is where the benchmark actually starts
b.StartTimer()
events := runner(b)
require.Len(b, events, expEventCount)
}

// createFilestreamTestRunner can be used for both benchmarks and regular tests to run a filestream input
// with the given configuration and event limit.
// `testID` must be unique for each test run
// `cfg` must be a valid YAML string containing valid filestream configuration
// `eventLimit` is an amount of produced events after which the filestream will shutdown
//
// returns a runner function that returns produced events.
func createFilestreamTestRunner(b testing.TB, testID string, cfg string, eventLimit int) func(t testing.TB) []beat.Event {
logger := logp.L()
c, err := conf.NewConfigWithYAML([]byte(cfg), cfg)
require.NoError(t, err)
require.NoError(b, err)

p := Plugin(logger, createTestStore(t))
p := Plugin(logger, createTestStore(b))
input, err := p.Manager.Create(c)
require.NoError(t, err)
require.NoError(b, err)

ctx, cancel := context.WithCancel(context.Background())
context := v2.Context{
Expand All @@ -56,17 +108,22 @@ func runFilestreamBenchmark(t testing.TB, testID string, cfg string, expEventCou
Cancelation: ctx,
}

connector, eventsDone := newTestPipeline(expEventCount)
events := make([]beat.Event, 0, eventLimit)
connector, eventsDone := newTestPipeline(eventLimit, &events)
done := make(chan struct{})
go func() {
err := input.Run(context, connector)
assert.NoError(t, err)
done <- struct{}{}
}()

<-eventsDone
cancel()
<-done // for more stable results we should wait until the full shutdown
return func(t testing.TB) []beat.Event {
go func() {
err := input.Run(context, connector)
assert.NoError(b, err)
close(done)
}()

<-eventsDone
cancel()
<-done // for more stable results we should wait until the full shutdown
return events
}
}

func generateFile(t testing.TB, lineCount int) string {
Expand All @@ -84,39 +141,6 @@ func generateFile(t testing.TB, lineCount int) string {
return filename
}

func BenchmarkFilestream(b *testing.B) {
logp.TestingSetup(logp.ToDiscardOutput())
lineCount := 10000
filename := generateFile(b, lineCount)

b.Run("filestream default throughput", func(b *testing.B) {
cfg := `
type: filestream
prospector.scanner.check_interval: 1s
paths:
- ` + filename + `
`
for i := 0; i < b.N; i++ {
runFilestreamBenchmark(b, fmt.Sprintf("default-benchmark-%d", i), cfg, lineCount)
}
})

b.Run("filestream fingerprint throughput", func(b *testing.B) {
cfg := `
type: filestream
prospector.scanner:
fingerprint.enabled: true
check_interval: 1s
file_identity.fingerprint: ~
paths:
- ` + filename + `
`
for i := 0; i < b.N; i++ {
runFilestreamBenchmark(b, fmt.Sprintf("fp-benchmark-%d", i), cfg, lineCount)
}
})
}

func createTestStore(t testing.TB) loginp.StateStore {
return &testStore{registry: statestore.NewRegistry(storetest.NewMemoryStoreBackend())}
}
Expand All @@ -137,14 +161,15 @@ func (s *testStore) CleanupInterval() time.Duration {
return time.Second
}

func newTestPipeline(eventLimit int) (pc beat.PipelineConnector, done <-chan struct{}) {
func newTestPipeline(eventLimit int, out *[]beat.Event) (pc beat.PipelineConnector, done <-chan struct{}) {
ch := make(chan struct{})
return &testPipeline{limit: eventLimit, done: ch}, ch
return &testPipeline{limit: eventLimit, done: ch, out: out}, ch
}

type testPipeline struct {
done chan struct{}
limit int
out *[]beat.Event
}

func (p *testPipeline) ConnectWith(beat.ClientConfig) (beat.Client, error) {
Expand All @@ -160,8 +185,12 @@ type testClient struct {

func (c *testClient) Publish(event beat.Event) {
c.testPipeline.limit--
if c.testPipeline.limit < 0 {
return
}
*c.testPipeline.out = append(*c.testPipeline.out, event)
if c.testPipeline.limit == 0 {
c.testPipeline.done <- struct{}{}
close(c.testPipeline.done)
}
}

Expand Down

0 comments on commit a8d1567

Please sign in to comment.