diff --git a/internal/benchrunner/runners/pipeline/benchmark.go b/internal/benchrunner/runners/pipeline/benchmark.go index 0b3de8b946..2784dc1b23 100644 --- a/internal/benchrunner/runners/pipeline/benchmark.go +++ b/internal/benchrunner/runners/pipeline/benchmark.go @@ -5,6 +5,7 @@ package pipeline import ( + "context" "encoding/json" "errors" "fmt" @@ -75,9 +76,9 @@ func (p BenchmarkValue) String() (r string) { return r } -func (r *runner) benchmarkPipeline(b *benchmark, entryPipeline string) (*BenchmarkResult, error) { +func (r *runner) benchmarkPipeline(ctx context.Context, b *benchmark, entryPipeline string) (*BenchmarkResult, error) { // Run benchmark - bench, err := r.benchmarkIngest(b, entryPipeline) + bench, err := r.benchmarkIngest(ctx, b, entryPipeline) if err != nil { return nil, fmt.Errorf("failed running benchmark: %w", err) } @@ -196,9 +197,9 @@ type ingestResult struct { numDocs int } -func (r *runner) benchmarkIngest(b *benchmark, entryPipeline string) (ingestResult, error) { +func (r *runner) benchmarkIngest(ctx context.Context, b *benchmark, entryPipeline string) (ingestResult, error) { baseDocs := resizeDocs(b.events, b.config.NumDocs) - return r.runSingleBenchmark(entryPipeline, baseDocs) + return r.runSingleBenchmark(ctx, entryPipeline, baseDocs) } type processorPerformance struct { @@ -298,12 +299,12 @@ func (agg aggregation) collect(fn mapFn) ([]BenchmarkValue, error) { return r, nil } -func (r *runner) runSingleBenchmark(entryPipeline string, docs []json.RawMessage) (ingestResult, error) { +func (r *runner) runSingleBenchmark(ctx context.Context, entryPipeline string, docs []json.RawMessage) (ingestResult, error) { if len(docs) == 0 { return ingestResult{}, errors.New("no docs supplied for benchmark") } - if _, err := ingest.SimulatePipeline(r.options.API, entryPipeline, docs, "test-generic-default"); err != nil { + if _, err := ingest.SimulatePipeline(ctx, r.options.API, entryPipeline, docs, "test-generic-default"); err != nil { return ingestResult{}, fmt.Errorf("simulate failed: %w", err) } diff --git a/internal/benchrunner/runners/pipeline/runner.go b/internal/benchrunner/runners/pipeline/runner.go index bc917c22be..c43c47e05b 100644 --- a/internal/benchrunner/runners/pipeline/runner.go +++ b/internal/benchrunner/runners/pipeline/runner.go @@ -58,7 +58,7 @@ func (r *runner) SetUp(ctx context.Context) error { // TearDown shuts down the pipeline benchmark runner. func (r *runner) TearDown(ctx context.Context) error { - if err := ingest.UninstallPipelines(r.options.API, r.pipelines); err != nil { + if err := ingest.UninstallPipelines(ctx, r.options.API, r.pipelines); err != nil { return fmt.Errorf("uninstalling ingest pipelines failed: %w", err) } return nil @@ -66,16 +66,16 @@ func (r *runner) TearDown(ctx context.Context) error { // Run runs the pipeline benchmarks defined under the given folder func (r *runner) Run(ctx context.Context) (reporters.Reportable, error) { - return r.run() + return r.run(ctx) } -func (r *runner) run() (reporters.Reportable, error) { +func (r *runner) run(ctx context.Context) (reporters.Reportable, error) { b, err := r.loadBenchmark() if err != nil { return nil, fmt.Errorf("loading benchmark failed: %w", err) } - benchmark, err := r.benchmarkPipeline(b, r.entryPipeline) + benchmark, err := r.benchmarkPipeline(ctx, b, r.entryPipeline) if err != nil { return nil, err } diff --git a/internal/benchrunner/runners/rally/runner.go b/internal/benchrunner/runners/rally/runner.go index e936b402aa..e4b9e3799a 100644 --- a/internal/benchrunner/runners/rally/runner.go +++ b/internal/benchrunner/runners/rally/runner.go @@ -319,13 +319,13 @@ func (r *runner) setUp(ctx context.Context) error { dataStreamManifest.Name, ) - r.indexTemplateBody, err = r.extractSimulatedTemplate(indexTemplate) + r.indexTemplateBody, err = r.extractSimulatedTemplate(ctx, indexTemplate) if err != nil { return fmt.Errorf("error extracting routing path: %s: %w", indexTemplate, err) } } - if err := r.wipeDataStreamOnSetup(); err != nil { + if err := r.wipeDataStreamOnSetup(ctx); err != nil { return fmt.Errorf("error deleting old data in data stream: %s: %w", r.runtimeDataStream, err) } @@ -343,8 +343,11 @@ func (r *runner) setUp(ctx context.Context) error { return nil } -func (r *runner) extractSimulatedTemplate(indexTemplate string) (string, error) { - simulateTemplate, err := r.options.ESAPI.Indices.SimulateTemplate(r.options.ESAPI.Indices.SimulateTemplate.WithName(indexTemplate)) +func (r *runner) extractSimulatedTemplate(ctx context.Context, indexTemplate string) (string, error) { + simulateTemplate, err := r.options.ESAPI.Indices.SimulateTemplate( + r.options.ESAPI.Indices.SimulateTemplate.WithContext(ctx), + r.options.ESAPI.Indices.SimulateTemplate.WithName(indexTemplate), + ) if err != nil { return "", fmt.Errorf("error simulating template from composable template: %s: %w", indexTemplate, err) } @@ -384,18 +387,18 @@ func (r *runner) extractSimulatedTemplate(indexTemplate string) (string, error) return string(newTemplate), nil } -func (r *runner) wipeDataStreamOnSetup() error { +func (r *runner) wipeDataStreamOnSetup(ctx context.Context) error { // Delete old data logger.Debug("deleting old data in data stream...") r.wipeDataStreamHandler = func(ctx context.Context) error { logger.Debugf("deleting data in data stream...") - if err := r.deleteDataStreamDocs(r.runtimeDataStream); err != nil { + if err := r.deleteDataStreamDocs(ctx, r.runtimeDataStream); err != nil { return fmt.Errorf("error deleting data in data stream: %w", err) } return nil } - return r.deleteDataStreamDocs(r.runtimeDataStream) + return r.deleteDataStreamDocs(ctx, r.runtimeDataStream) } func (r *runner) run(ctx context.Context) (report reporters.Reportable, err error) { @@ -443,7 +446,7 @@ func (r *runner) run(ctx context.Context) (report reporters.Reportable, err erro return nil, fmt.Errorf("can't summarize metrics: %w", err) } - if err := r.reindexData(); err != nil { + if err := r.reindexData(ctx); err != nil { return nil, fmt.Errorf("can't reindex data: %w", err) } @@ -529,9 +532,11 @@ func (r *runner) collectAndSummarizeMetrics() (*metricsSummary, error) { return sum, err } -func (r *runner) deleteDataStreamDocs(dataStream string) error { +func (r *runner) deleteDataStreamDocs(ctx context.Context, dataStream string) error { body := strings.NewReader(`{ "query": { "match_all": {} } }`) - resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body) + resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body, + r.options.ESAPI.DeleteByQuery.WithContext(ctx), + ) if err != nil { return fmt.Errorf("failed to delete data stream docs for data stream %s: %w", dataStream, err) } @@ -941,7 +946,7 @@ func (r *runner) runRally(ctx context.Context) ([]rallyStat, error) { } // reindexData will read all data generated during the benchmark and will reindex it to the metrisctore -func (r *runner) reindexData() error { +func (r *runner) reindexData(ctx context.Context) error { if !r.options.ReindexData { return nil } @@ -954,6 +959,7 @@ func (r *runner) reindexData() error { logger.Debug("getting orignal mappings...") // Get the mapping from the source data stream mappingRes, err := r.options.ESAPI.Indices.GetMapping( + r.options.ESAPI.Indices.GetMapping.WithContext(ctx), r.options.ESAPI.Indices.GetMapping.WithIndex(r.runtimeDataStream), ) if err != nil { @@ -999,6 +1005,7 @@ func (r *runner) reindexData() error { createRes, err := r.options.ESMetricsAPI.Indices.Create( indexName, + r.options.ESMetricsAPI.Indices.Create.WithContext(ctx), r.options.ESMetricsAPI.Indices.Create.WithBody(reader), ) if err != nil { @@ -1014,6 +1021,7 @@ func (r *runner) reindexData() error { logger.Debug("starting scrolling of events...") res, err := r.options.ESAPI.Search( + r.options.ESAPI.Search.WithContext(ctx), r.options.ESAPI.Search.WithIndex(r.runtimeDataStream), r.options.ESAPI.Search.WithBody(bodyReader), r.options.ESAPI.Search.WithScroll(time.Minute), @@ -1042,7 +1050,7 @@ func (r *runner) reindexData() error { break } - err := r.bulkMetrics(indexName, sr) + err := r.bulkMetrics(ctx, indexName, sr) if err != nil { return err } @@ -1063,7 +1071,7 @@ type searchResponse struct { } `json:"hits"` } -func (r *runner) bulkMetrics(indexName string, sr searchResponse) error { +func (r *runner) bulkMetrics(ctx context.Context, indexName string, sr searchResponse) error { var bulkBodyBuilder strings.Builder for _, hit := range sr.Hits { bulkBodyBuilder.WriteString(fmt.Sprintf("{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n", indexName, hit.ID)) @@ -1077,7 +1085,9 @@ func (r *runner) bulkMetrics(indexName string, sr searchResponse) error { logger.Debugf("bulk request of %d events...", len(sr.Hits)) - resp, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String())) + resp, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String()), + r.options.ESMetricsAPI.Bulk.WithContext(ctx), + ) if err != nil { return fmt.Errorf("error performing the bulk index request: %w", err) } @@ -1091,6 +1101,7 @@ func (r *runner) bulkMetrics(indexName string, sr searchResponse) error { } resp, err = r.options.ESAPI.Scroll( + r.options.ESAPI.Scroll.WithContext(ctx), r.options.ESAPI.Scroll.WithScrollID(sr.ScrollID), r.options.ESAPI.Scroll.WithScroll(time.Minute), ) diff --git a/internal/benchrunner/runners/stream/runner.go b/internal/benchrunner/runners/stream/runner.go index e7c33b0608..a372209479 100644 --- a/internal/benchrunner/runners/stream/runner.go +++ b/internal/benchrunner/runners/stream/runner.go @@ -199,7 +199,7 @@ func (r *runner) setUp(ctx context.Context) error { return nil } - if err := r.wipeDataStreamsOnSetup(); err != nil { + if err := r.wipeDataStreamsOnSetup(ctx); err != nil { return fmt.Errorf("error cleaning up old data in data streams: %w", err) } @@ -224,13 +224,13 @@ func (r *runner) setUp(ctx context.Context) error { return nil } -func (r *runner) wipeDataStreamsOnSetup() error { +func (r *runner) wipeDataStreamsOnSetup(ctx context.Context) error { // Delete old data logger.Debug("deleting old data in data stream...") r.wipeDataStreamHandler = func(ctx context.Context) error { logger.Debugf("deleting data in data stream...") for _, runtimeDataStream := range r.runtimeDataStreams { - if err := r.deleteDataStreamDocs(runtimeDataStream); err != nil { + if err := r.deleteDataStreamDocs(ctx, runtimeDataStream); err != nil { return fmt.Errorf("error deleting data in data stream: %w", err) } } @@ -238,7 +238,7 @@ func (r *runner) wipeDataStreamsOnSetup() error { } for _, runtimeDataStream := range r.runtimeDataStreams { - if err := r.deleteDataStreamDocs(runtimeDataStream); err != nil { + if err := r.deleteDataStreamDocs(ctx, runtimeDataStream); err != nil { return fmt.Errorf("error deleting data in data stream: %w", err) } } @@ -278,9 +278,11 @@ func (r *runner) installPackageFromPackageRoot(ctx context.Context) error { return nil } -func (r *runner) deleteDataStreamDocs(dataStream string) error { +func (r *runner) deleteDataStreamDocs(ctx context.Context, dataStream string) error { body := strings.NewReader(`{ "query": { "match_all": {} } }`) - resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body) + resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body, + r.options.ESAPI.DeleteByQuery.WithContext(ctx), + ) if err != nil { return fmt.Errorf("failed to delete data stream docs for data stream %s: %w", dataStream, err) } @@ -484,8 +486,11 @@ func (r *runner) collectBulkRequestBody(indexName, scenarioName string, buf *byt return bulkBodyBuilder, nil } -func (r *runner) performBulkRequest(bulkRequest string) error { - resp, err := r.options.ESAPI.Bulk(strings.NewReader(bulkRequest)) +func (r *runner) performBulkRequest(ctx context.Context, bulkRequest string) error { + resp, err := r.options.ESAPI.Bulk(strings.NewReader(bulkRequest), + r.options.ESAPI.Bulk.WithContext(ctx), + ) + if err != nil { return err } @@ -595,7 +600,7 @@ func (r *runner) runStreamGenerator(ctx context.Context, scenarioName string) er } } - err := r.performBulkRequest(bulkBodyBuilder.String()) + err := r.performBulkRequest(ctx, bulkBodyBuilder.String()) if err != nil { return fmt.Errorf("error performing bulk request: %w", err) } @@ -628,7 +633,7 @@ func (r *runner) runBackfillGenerator(ctx context.Context, scenarioName string) } } - return r.performBulkRequest(bulkBodyBuilder.String()) + return r.performBulkRequest(ctx, bulkBodyBuilder.String()) } type benchMeta struct { diff --git a/internal/benchrunner/runners/system/runner.go b/internal/benchrunner/runners/system/runner.go index 067f4c7eb9..d5d49fe723 100644 --- a/internal/benchrunner/runners/system/runner.go +++ b/internal/benchrunner/runners/system/runner.go @@ -204,13 +204,13 @@ func (r *runner) setUp(ctx context.Context) error { r.wipeDataStreamHandler = func(ctx context.Context) error { logger.Debugf("deleting data in data stream...") - if err := r.deleteDataStreamDocs(r.runtimeDataStream); err != nil { + if err := r.deleteDataStreamDocs(ctx, r.runtimeDataStream); err != nil { return fmt.Errorf("error deleting data in data stream: %w", err) } return nil } - if err := r.deleteDataStreamDocs(r.runtimeDataStream); err != nil { + if err := r.deleteDataStreamDocs(ctx, r.runtimeDataStream); err != nil { return fmt.Errorf("error deleting old data in data stream: %s: %w", r.runtimeDataStream, err) } @@ -276,7 +276,7 @@ func (r *runner) run(ctx context.Context) (report reporters.Reportable, err erro return nil, fmt.Errorf("can't summarize metrics: %w", err) } - if err := r.reindexData(); err != nil { + if err := r.reindexData(ctx); err != nil { return nil, fmt.Errorf("can't reindex data: %w", err) } @@ -346,9 +346,11 @@ func (r *runner) collectAndSummarizeMetrics() (*metricsSummary, error) { return sum, err } -func (r *runner) deleteDataStreamDocs(dataStream string) error { +func (r *runner) deleteDataStreamDocs(ctx context.Context, dataStream string) error { body := strings.NewReader(`{ "query": { "match_all": {} } }`) - resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body) + resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body, + r.options.ESAPI.DeleteByQuery.WithContext(ctx), + ) if err != nil { return fmt.Errorf("failed to delete docs for data stream %s: %w", dataStream, err) } @@ -736,7 +738,7 @@ func (r *runner) enrollAgents(ctx context.Context) error { } // reindexData will read all data generated during the benchmark and will reindex it to the metrisctore -func (r *runner) reindexData() error { +func (r *runner) reindexData(ctx context.Context) error { if !r.options.ReindexData { return nil } @@ -749,6 +751,7 @@ func (r *runner) reindexData() error { logger.Debug("getting orignal mappings...") // Get the mapping from the source data stream mappingRes, err := r.options.ESAPI.Indices.GetMapping( + r.options.ESAPI.Indices.GetMapping.WithContext(ctx), r.options.ESAPI.Indices.GetMapping.WithIndex(r.runtimeDataStream), ) if err != nil { @@ -794,6 +797,7 @@ func (r *runner) reindexData() error { createRes, err := r.options.ESMetricsAPI.Indices.Create( indexName, + r.options.ESMetricsAPI.Indices.Create.WithContext(ctx), r.options.ESMetricsAPI.Indices.Create.WithBody(reader), ) if err != nil { @@ -809,6 +813,7 @@ func (r *runner) reindexData() error { logger.Debug("starting scrolling of events...") resp, err := r.options.ESAPI.Search( + r.options.ESAPI.Search.WithContext(ctx), r.options.ESAPI.Search.WithIndex(r.runtimeDataStream), r.options.ESAPI.Search.WithBody(bodyReader), r.options.ESAPI.Search.WithScroll(time.Minute), @@ -838,7 +843,7 @@ func (r *runner) reindexData() error { break } - err := r.bulkMetrics(indexName, sr) + err := r.bulkMetrics(ctx, indexName, sr) if err != nil { return err } @@ -859,7 +864,7 @@ type searchResponse struct { } `json:"hits"` } -func (r *runner) bulkMetrics(indexName string, sr searchResponse) error { +func (r *runner) bulkMetrics(ctx context.Context, indexName string, sr searchResponse) error { var bulkBodyBuilder strings.Builder for _, hit := range sr.Hits { bulkBodyBuilder.WriteString(fmt.Sprintf("{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n", indexName, hit.ID)) @@ -873,7 +878,9 @@ func (r *runner) bulkMetrics(indexName string, sr searchResponse) error { logger.Debugf("bulk request of %d events...", len(sr.Hits)) - resp, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String())) + resp, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String()), + r.options.ESMetricsAPI.Bulk.WithContext(ctx), + ) if err != nil { return fmt.Errorf("error performing the bulk index request: %w", err) } @@ -887,6 +894,7 @@ func (r *runner) bulkMetrics(indexName string, sr searchResponse) error { } resp, err = r.options.ESAPI.Scroll( + r.options.ESAPI.Scroll.WithContext(ctx), r.options.ESAPI.Scroll.WithScrollID(sr.ScrollID), r.options.ESAPI.Scroll.WithScroll(time.Minute), ) diff --git a/internal/elasticsearch/ingest/pipeline.go b/internal/elasticsearch/ingest/pipeline.go index 16fde116b7..660717c80c 100644 --- a/internal/elasticsearch/ingest/pipeline.go +++ b/internal/elasticsearch/ingest/pipeline.go @@ -6,6 +6,7 @@ package ingest import ( "bytes" + "context" "encoding/json" "fmt" "io" @@ -72,7 +73,7 @@ func (p *Pipeline) MarshalJSON() (asJSON []byte, err error) { return asJSON, nil } -func SimulatePipeline(api *elasticsearch.API, pipelineName string, events []json.RawMessage, simulateDataStream string) ([]json.RawMessage, error) { +func SimulatePipeline(ctx context.Context, api *elasticsearch.API, pipelineName string, events []json.RawMessage, simulateDataStream string) ([]json.RawMessage, error) { var request simulatePipelineRequest for _, event := range events { request.Docs = append(request.Docs, pipelineDocument{ @@ -86,9 +87,10 @@ func SimulatePipeline(api *elasticsearch.API, pipelineName string, events []json return nil, fmt.Errorf("marshalling simulate request failed: %w", err) } - r, err := api.Ingest.Simulate(bytes.NewReader(requestBody), func(request *elasticsearch.IngestSimulateRequest) { - request.PipelineID = pipelineName - }) + r, err := api.Ingest.Simulate(bytes.NewReader(requestBody), + api.Ingest.Simulate.WithContext(ctx), + api.Ingest.Simulate.WithPipelineID(pipelineName), + ) if err != nil { return nil, fmt.Errorf("simulate API call failed (pipelineName: %s): %w", pipelineName, err) } @@ -116,9 +118,9 @@ func SimulatePipeline(api *elasticsearch.API, pipelineName string, events []json return processedEvents, nil } -func UninstallPipelines(api *elasticsearch.API, pipelines []Pipeline) error { +func UninstallPipelines(ctx context.Context, api *elasticsearch.API, pipelines []Pipeline) error { for _, p := range pipelines { - err := uninstallPipeline(api, p.Name) + err := uninstallPipeline(ctx, api, p.Name) if err != nil { return err } @@ -126,8 +128,8 @@ func UninstallPipelines(api *elasticsearch.API, pipelines []Pipeline) error { return nil } -func uninstallPipeline(api *elasticsearch.API, name string) error { - resp, err := api.Ingest.DeletePipeline(name) +func uninstallPipeline(ctx context.Context, api *elasticsearch.API, name string) error { + resp, err := api.Ingest.DeletePipeline(name, api.Ingest.DeletePipeline.WithContext(ctx)) if err != nil { return fmt.Errorf("delete pipeline API call failed (pipelineName: %s): %w", name, err) } diff --git a/internal/testrunner/runners/pipeline/runner.go b/internal/testrunner/runners/pipeline/runner.go index 89c28714cb..f7f85f68ce 100644 --- a/internal/testrunner/runners/pipeline/runner.go +++ b/internal/testrunner/runners/pipeline/runner.go @@ -101,7 +101,7 @@ func (r *runner) TearDown(ctx context.Context) error { } } - if err := ingest.UninstallPipelines(r.options.API, r.pipelines); err != nil { + if err := ingest.UninstallPipelines(ctx, r.options.API, r.pipelines); err != nil { return fmt.Errorf("uninstalling ingest pipelines failed: %w", err) } return nil @@ -187,7 +187,7 @@ func (r *runner) run(ctx context.Context) ([]testrunner.TestResult, error) { fields.WithExpectedDatasets(expectedDatasets), fields.WithEnabledImportAllECSSChema(true), } - result, err := r.runTestCase(testCaseFile, dataStreamPath, dsManifest.Type, entryPipeline, validatorOptions) + result, err := r.runTestCase(ctx, testCaseFile, dataStreamPath, dsManifest.Type, entryPipeline, validatorOptions) if err != nil { return nil, err } @@ -259,7 +259,7 @@ func (r *runner) checkElasticsearchLogs(ctx context.Context, startTesting time.T } -func (r *runner) runTestCase(testCaseFile string, dsPath string, dsType string, pipeline string, validatorOptions []fields.ValidatorOption) (testrunner.TestResult, error) { +func (r *runner) runTestCase(ctx context.Context, testCaseFile string, dsPath string, dsType string, pipeline string, validatorOptions []fields.ValidatorOption) (testrunner.TestResult, error) { tr := testrunner.TestResult{ TestType: TestType, Package: r.options.TestFolder.Package, @@ -285,7 +285,7 @@ func (r *runner) runTestCase(testCaseFile string, dsPath string, dsType string, } simulateDataStream := dsType + "-" + r.options.TestFolder.Package + "." + r.options.TestFolder.DataStream + "-default" - processedEvents, err := ingest.SimulatePipeline(r.options.API, pipeline, tc.events, simulateDataStream) + processedEvents, err := ingest.SimulatePipeline(ctx, r.options.API, pipeline, tc.events, simulateDataStream) if err != nil { err := fmt.Errorf("simulating pipeline processing failed: %w", err) tr.ErrorMsg = err.Error()