Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions internal/benchrunner/runners/pipeline/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package pipeline

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -75,9 +76,9 @@ func (p BenchmarkValue) String() (r string) {
return r
}

func (r *runner) benchmarkPipeline(b *benchmark, entryPipeline string) (*BenchmarkResult, error) {
func (r *runner) benchmarkPipeline(ctx context.Context, b *benchmark, entryPipeline string) (*BenchmarkResult, error) {
// Run benchmark
bench, err := r.benchmarkIngest(b, entryPipeline)
bench, err := r.benchmarkIngest(ctx, b, entryPipeline)
if err != nil {
return nil, fmt.Errorf("failed running benchmark: %w", err)
}
Expand Down Expand Up @@ -196,9 +197,9 @@ type ingestResult struct {
numDocs int
}

func (r *runner) benchmarkIngest(b *benchmark, entryPipeline string) (ingestResult, error) {
func (r *runner) benchmarkIngest(ctx context.Context, b *benchmark, entryPipeline string) (ingestResult, error) {
baseDocs := resizeDocs(b.events, b.config.NumDocs)
return r.runSingleBenchmark(entryPipeline, baseDocs)
return r.runSingleBenchmark(ctx, entryPipeline, baseDocs)
}

type processorPerformance struct {
Expand Down Expand Up @@ -298,12 +299,12 @@ func (agg aggregation) collect(fn mapFn) ([]BenchmarkValue, error) {
return r, nil
}

func (r *runner) runSingleBenchmark(entryPipeline string, docs []json.RawMessage) (ingestResult, error) {
func (r *runner) runSingleBenchmark(ctx context.Context, entryPipeline string, docs []json.RawMessage) (ingestResult, error) {
if len(docs) == 0 {
return ingestResult{}, errors.New("no docs supplied for benchmark")
}

if _, err := ingest.SimulatePipeline(r.options.API, entryPipeline, docs, "test-generic-default"); err != nil {
if _, err := ingest.SimulatePipeline(ctx, r.options.API, entryPipeline, docs, "test-generic-default"); err != nil {
return ingestResult{}, fmt.Errorf("simulate failed: %w", err)
}

Expand Down
8 changes: 4 additions & 4 deletions internal/benchrunner/runners/pipeline/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,24 +58,24 @@ func (r *runner) SetUp(ctx context.Context) error {

// TearDown shuts down the pipeline benchmark runner.
func (r *runner) TearDown(ctx context.Context) error {
if err := ingest.UninstallPipelines(r.options.API, r.pipelines); err != nil {
if err := ingest.UninstallPipelines(ctx, r.options.API, r.pipelines); err != nil {
return fmt.Errorf("uninstalling ingest pipelines failed: %w", err)
}
return nil
}

// Run runs the pipeline benchmarks defined under the given folder
func (r *runner) Run(ctx context.Context) (reporters.Reportable, error) {
return r.run()
return r.run(ctx)
}

func (r *runner) run() (reporters.Reportable, error) {
func (r *runner) run(ctx context.Context) (reporters.Reportable, error) {
b, err := r.loadBenchmark()
if err != nil {
return nil, fmt.Errorf("loading benchmark failed: %w", err)
}

benchmark, err := r.benchmarkPipeline(b, r.entryPipeline)
benchmark, err := r.benchmarkPipeline(ctx, b, r.entryPipeline)
if err != nil {
return nil, err
}
Expand Down
39 changes: 25 additions & 14 deletions internal/benchrunner/runners/rally/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,13 +319,13 @@ func (r *runner) setUp(ctx context.Context) error {
dataStreamManifest.Name,
)

r.indexTemplateBody, err = r.extractSimulatedTemplate(indexTemplate)
r.indexTemplateBody, err = r.extractSimulatedTemplate(ctx, indexTemplate)
if err != nil {
return fmt.Errorf("error extracting routing path: %s: %w", indexTemplate, err)
}
}

if err := r.wipeDataStreamOnSetup(); err != nil {
if err := r.wipeDataStreamOnSetup(ctx); err != nil {
return fmt.Errorf("error deleting old data in data stream: %s: %w", r.runtimeDataStream, err)
}

Expand All @@ -343,8 +343,11 @@ func (r *runner) setUp(ctx context.Context) error {
return nil
}

func (r *runner) extractSimulatedTemplate(indexTemplate string) (string, error) {
simulateTemplate, err := r.options.ESAPI.Indices.SimulateTemplate(r.options.ESAPI.Indices.SimulateTemplate.WithName(indexTemplate))
func (r *runner) extractSimulatedTemplate(ctx context.Context, indexTemplate string) (string, error) {
simulateTemplate, err := r.options.ESAPI.Indices.SimulateTemplate(
r.options.ESAPI.Indices.SimulateTemplate.WithContext(ctx),
r.options.ESAPI.Indices.SimulateTemplate.WithName(indexTemplate),
)
if err != nil {
return "", fmt.Errorf("error simulating template from composable template: %s: %w", indexTemplate, err)
}
Expand Down Expand Up @@ -384,18 +387,18 @@ func (r *runner) extractSimulatedTemplate(indexTemplate string) (string, error)
return string(newTemplate), nil
}

func (r *runner) wipeDataStreamOnSetup() error {
func (r *runner) wipeDataStreamOnSetup(ctx context.Context) error {
// Delete old data
logger.Debug("deleting old data in data stream...")
r.wipeDataStreamHandler = func(ctx context.Context) error {
logger.Debugf("deleting data in data stream...")
if err := r.deleteDataStreamDocs(r.runtimeDataStream); err != nil {
if err := r.deleteDataStreamDocs(ctx, r.runtimeDataStream); err != nil {
return fmt.Errorf("error deleting data in data stream: %w", err)
}
return nil
}

return r.deleteDataStreamDocs(r.runtimeDataStream)
return r.deleteDataStreamDocs(ctx, r.runtimeDataStream)
}

func (r *runner) run(ctx context.Context) (report reporters.Reportable, err error) {
Expand Down Expand Up @@ -443,7 +446,7 @@ func (r *runner) run(ctx context.Context) (report reporters.Reportable, err erro
return nil, fmt.Errorf("can't summarize metrics: %w", err)
}

if err := r.reindexData(); err != nil {
if err := r.reindexData(ctx); err != nil {
return nil, fmt.Errorf("can't reindex data: %w", err)
}

Expand Down Expand Up @@ -529,9 +532,11 @@ func (r *runner) collectAndSummarizeMetrics() (*metricsSummary, error) {
return sum, err
}

func (r *runner) deleteDataStreamDocs(dataStream string) error {
func (r *runner) deleteDataStreamDocs(ctx context.Context, dataStream string) error {
body := strings.NewReader(`{ "query": { "match_all": {} } }`)
resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body)
resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body,
r.options.ESAPI.DeleteByQuery.WithContext(ctx),
)
if err != nil {
return fmt.Errorf("failed to delete data stream docs for data stream %s: %w", dataStream, err)
}
Expand Down Expand Up @@ -941,7 +946,7 @@ func (r *runner) runRally(ctx context.Context) ([]rallyStat, error) {
}

// reindexData will read all data generated during the benchmark and will reindex it to the metrisctore
func (r *runner) reindexData() error {
func (r *runner) reindexData(ctx context.Context) error {
if !r.options.ReindexData {
return nil
}
Expand All @@ -954,6 +959,7 @@ func (r *runner) reindexData() error {
logger.Debug("getting orignal mappings...")
// Get the mapping from the source data stream
mappingRes, err := r.options.ESAPI.Indices.GetMapping(
r.options.ESAPI.Indices.GetMapping.WithContext(ctx),
r.options.ESAPI.Indices.GetMapping.WithIndex(r.runtimeDataStream),
)
if err != nil {
Expand Down Expand Up @@ -999,6 +1005,7 @@ func (r *runner) reindexData() error {

createRes, err := r.options.ESMetricsAPI.Indices.Create(
indexName,
r.options.ESMetricsAPI.Indices.Create.WithContext(ctx),
r.options.ESMetricsAPI.Indices.Create.WithBody(reader),
)
if err != nil {
Expand All @@ -1014,6 +1021,7 @@ func (r *runner) reindexData() error {

logger.Debug("starting scrolling of events...")
res, err := r.options.ESAPI.Search(
r.options.ESAPI.Search.WithContext(ctx),
r.options.ESAPI.Search.WithIndex(r.runtimeDataStream),
r.options.ESAPI.Search.WithBody(bodyReader),
r.options.ESAPI.Search.WithScroll(time.Minute),
Expand Down Expand Up @@ -1042,7 +1050,7 @@ func (r *runner) reindexData() error {
break
}

err := r.bulkMetrics(indexName, sr)
err := r.bulkMetrics(ctx, indexName, sr)
if err != nil {
return err
}
Expand All @@ -1063,7 +1071,7 @@ type searchResponse struct {
} `json:"hits"`
}

func (r *runner) bulkMetrics(indexName string, sr searchResponse) error {
func (r *runner) bulkMetrics(ctx context.Context, indexName string, sr searchResponse) error {
var bulkBodyBuilder strings.Builder
for _, hit := range sr.Hits {
bulkBodyBuilder.WriteString(fmt.Sprintf("{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n", indexName, hit.ID))
Expand All @@ -1077,7 +1085,9 @@ func (r *runner) bulkMetrics(indexName string, sr searchResponse) error {

logger.Debugf("bulk request of %d events...", len(sr.Hits))

resp, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String()))
resp, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String()),
r.options.ESMetricsAPI.Bulk.WithContext(ctx),
)
if err != nil {
return fmt.Errorf("error performing the bulk index request: %w", err)
}
Expand All @@ -1091,6 +1101,7 @@ func (r *runner) bulkMetrics(indexName string, sr searchResponse) error {
}

resp, err = r.options.ESAPI.Scroll(
r.options.ESAPI.Scroll.WithContext(ctx),
r.options.ESAPI.Scroll.WithScrollID(sr.ScrollID),
r.options.ESAPI.Scroll.WithScroll(time.Minute),
)
Expand Down
25 changes: 15 additions & 10 deletions internal/benchrunner/runners/stream/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (r *runner) setUp(ctx context.Context) error {
return nil
}

if err := r.wipeDataStreamsOnSetup(); err != nil {
if err := r.wipeDataStreamsOnSetup(ctx); err != nil {
return fmt.Errorf("error cleaning up old data in data streams: %w", err)
}

Expand All @@ -224,21 +224,21 @@ func (r *runner) setUp(ctx context.Context) error {
return nil
}

func (r *runner) wipeDataStreamsOnSetup() error {
func (r *runner) wipeDataStreamsOnSetup(ctx context.Context) error {
// Delete old data
logger.Debug("deleting old data in data stream...")
r.wipeDataStreamHandler = func(ctx context.Context) error {
logger.Debugf("deleting data in data stream...")
for _, runtimeDataStream := range r.runtimeDataStreams {
if err := r.deleteDataStreamDocs(runtimeDataStream); err != nil {
if err := r.deleteDataStreamDocs(ctx, runtimeDataStream); err != nil {
return fmt.Errorf("error deleting data in data stream: %w", err)
}
}
return nil
}

for _, runtimeDataStream := range r.runtimeDataStreams {
if err := r.deleteDataStreamDocs(runtimeDataStream); err != nil {
if err := r.deleteDataStreamDocs(ctx, runtimeDataStream); err != nil {
return fmt.Errorf("error deleting data in data stream: %w", err)
}
}
Expand Down Expand Up @@ -278,9 +278,11 @@ func (r *runner) installPackageFromPackageRoot(ctx context.Context) error {
return nil
}

func (r *runner) deleteDataStreamDocs(dataStream string) error {
func (r *runner) deleteDataStreamDocs(ctx context.Context, dataStream string) error {
body := strings.NewReader(`{ "query": { "match_all": {} } }`)
resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body)
resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body,
r.options.ESAPI.DeleteByQuery.WithContext(ctx),
)
if err != nil {
return fmt.Errorf("failed to delete data stream docs for data stream %s: %w", dataStream, err)
}
Expand Down Expand Up @@ -484,8 +486,11 @@ func (r *runner) collectBulkRequestBody(indexName, scenarioName string, buf *byt
return bulkBodyBuilder, nil
}

func (r *runner) performBulkRequest(bulkRequest string) error {
resp, err := r.options.ESAPI.Bulk(strings.NewReader(bulkRequest))
func (r *runner) performBulkRequest(ctx context.Context, bulkRequest string) error {
resp, err := r.options.ESAPI.Bulk(strings.NewReader(bulkRequest),
r.options.ESAPI.Bulk.WithContext(ctx),
)

if err != nil {
return err
}
Expand Down Expand Up @@ -595,7 +600,7 @@ func (r *runner) runStreamGenerator(ctx context.Context, scenarioName string) er
}
}

err := r.performBulkRequest(bulkBodyBuilder.String())
err := r.performBulkRequest(ctx, bulkBodyBuilder.String())
if err != nil {
return fmt.Errorf("error performing bulk request: %w", err)
}
Expand Down Expand Up @@ -628,7 +633,7 @@ func (r *runner) runBackfillGenerator(ctx context.Context, scenarioName string)
}
}

return r.performBulkRequest(bulkBodyBuilder.String())
return r.performBulkRequest(ctx, bulkBodyBuilder.String())
}

type benchMeta struct {
Expand Down
Loading