From 7d0999fbe2d417fb6e1f6b145cebca54ce2f866d Mon Sep 17 00:00:00 2001 From: Mario Rodriguez Molins Date: Thu, 18 Apr 2024 11:34:34 +0200 Subject: [PATCH 1/5] Add context to Elasticsearch calls --- .../benchrunner/runners/pipeline/benchmark.go | 13 +++--- .../benchrunner/runners/pipeline/runner.go | 8 ++-- internal/benchrunner/runners/rally/runner.go | 40 ++++++++++++------- internal/benchrunner/runners/stream/runner.go | 25 +++++++----- internal/benchrunner/runners/system/runner.go | 27 ++++++++----- internal/elasticsearch/ingest/pipeline.go | 13 +++--- .../testrunner/runners/pipeline/runner.go | 8 ++-- 7 files changed, 81 insertions(+), 53 deletions(-) diff --git a/internal/benchrunner/runners/pipeline/benchmark.go b/internal/benchrunner/runners/pipeline/benchmark.go index 0b3de8b946..2784dc1b23 100644 --- a/internal/benchrunner/runners/pipeline/benchmark.go +++ b/internal/benchrunner/runners/pipeline/benchmark.go @@ -5,6 +5,7 @@ package pipeline import ( + "context" "encoding/json" "errors" "fmt" @@ -75,9 +76,9 @@ func (p BenchmarkValue) String() (r string) { return r } -func (r *runner) benchmarkPipeline(b *benchmark, entryPipeline string) (*BenchmarkResult, error) { +func (r *runner) benchmarkPipeline(ctx context.Context, b *benchmark, entryPipeline string) (*BenchmarkResult, error) { // Run benchmark - bench, err := r.benchmarkIngest(b, entryPipeline) + bench, err := r.benchmarkIngest(ctx, b, entryPipeline) if err != nil { return nil, fmt.Errorf("failed running benchmark: %w", err) } @@ -196,9 +197,9 @@ type ingestResult struct { numDocs int } -func (r *runner) benchmarkIngest(b *benchmark, entryPipeline string) (ingestResult, error) { +func (r *runner) benchmarkIngest(ctx context.Context, b *benchmark, entryPipeline string) (ingestResult, error) { baseDocs := resizeDocs(b.events, b.config.NumDocs) - return r.runSingleBenchmark(entryPipeline, baseDocs) + return r.runSingleBenchmark(ctx, entryPipeline, baseDocs) } type processorPerformance struct { @@ -298,12 +299,12 @@ func (agg aggregation) collect(fn mapFn) ([]BenchmarkValue, error) { return r, nil } -func (r *runner) runSingleBenchmark(entryPipeline string, docs []json.RawMessage) (ingestResult, error) { +func (r *runner) runSingleBenchmark(ctx context.Context, entryPipeline string, docs []json.RawMessage) (ingestResult, error) { if len(docs) == 0 { return ingestResult{}, errors.New("no docs supplied for benchmark") } - if _, err := ingest.SimulatePipeline(r.options.API, entryPipeline, docs, "test-generic-default"); err != nil { + if _, err := ingest.SimulatePipeline(ctx, r.options.API, entryPipeline, docs, "test-generic-default"); err != nil { return ingestResult{}, fmt.Errorf("simulate failed: %w", err) } diff --git a/internal/benchrunner/runners/pipeline/runner.go b/internal/benchrunner/runners/pipeline/runner.go index bc917c22be..c43c47e05b 100644 --- a/internal/benchrunner/runners/pipeline/runner.go +++ b/internal/benchrunner/runners/pipeline/runner.go @@ -58,7 +58,7 @@ func (r *runner) SetUp(ctx context.Context) error { // TearDown shuts down the pipeline benchmark runner. func (r *runner) TearDown(ctx context.Context) error { - if err := ingest.UninstallPipelines(r.options.API, r.pipelines); err != nil { + if err := ingest.UninstallPipelines(ctx, r.options.API, r.pipelines); err != nil { return fmt.Errorf("uninstalling ingest pipelines failed: %w", err) } return nil @@ -66,16 +66,16 @@ func (r *runner) TearDown(ctx context.Context) error { // Run runs the pipeline benchmarks defined under the given folder func (r *runner) Run(ctx context.Context) (reporters.Reportable, error) { - return r.run() + return r.run(ctx) } -func (r *runner) run() (reporters.Reportable, error) { +func (r *runner) run(ctx context.Context) (reporters.Reportable, error) { b, err := r.loadBenchmark() if err != nil { return nil, fmt.Errorf("loading benchmark failed: %w", err) } - benchmark, err := r.benchmarkPipeline(b, r.entryPipeline) + benchmark, err := r.benchmarkPipeline(ctx, b, r.entryPipeline) if err != nil { return nil, err } diff --git a/internal/benchrunner/runners/rally/runner.go b/internal/benchrunner/runners/rally/runner.go index e936b402aa..fb5af8a50f 100644 --- a/internal/benchrunner/runners/rally/runner.go +++ b/internal/benchrunner/runners/rally/runner.go @@ -319,13 +319,13 @@ func (r *runner) setUp(ctx context.Context) error { dataStreamManifest.Name, ) - r.indexTemplateBody, err = r.extractSimulatedTemplate(indexTemplate) + r.indexTemplateBody, err = r.extractSimulatedTemplate(ctx, indexTemplate) if err != nil { return fmt.Errorf("error extracting routing path: %s: %w", indexTemplate, err) } } - if err := r.wipeDataStreamOnSetup(); err != nil { + if err := r.wipeDataStreamOnSetup(ctx); err != nil { return fmt.Errorf("error deleting old data in data stream: %s: %w", r.runtimeDataStream, err) } @@ -343,8 +343,11 @@ func (r *runner) setUp(ctx context.Context) error { return nil } -func (r *runner) extractSimulatedTemplate(indexTemplate string) (string, error) { - simulateTemplate, err := r.options.ESAPI.Indices.SimulateTemplate(r.options.ESAPI.Indices.SimulateTemplate.WithName(indexTemplate)) +func (r *runner) extractSimulatedTemplate(ctx context.Context, indexTemplate string) (string, error) { + simulateTemplate, err := r.options.ESAPI.Indices.SimulateTemplate( + r.options.ESAPI.Indices.SimulateTemplate.WithContext(ctx), + r.options.ESAPI.Indices.SimulateTemplate.WithName(indexTemplate), + ) if err != nil { return "", fmt.Errorf("error simulating template from composable template: %s: %w", indexTemplate, err) } @@ -384,18 +387,18 @@ func (r *runner) extractSimulatedTemplate(indexTemplate string) (string, error) return string(newTemplate), nil } -func (r *runner) wipeDataStreamOnSetup() error { +func (r *runner) wipeDataStreamOnSetup(ctx context.Context) error { // Delete old data logger.Debug("deleting old data in data stream...") r.wipeDataStreamHandler = func(ctx context.Context) error { logger.Debugf("deleting data in data stream...") - if err := r.deleteDataStreamDocs(r.runtimeDataStream); err != nil { + if err := r.deleteDataStreamDocs(ctx, r.runtimeDataStream); err != nil { return fmt.Errorf("error deleting data in data stream: %w", err) } return nil } - return r.deleteDataStreamDocs(r.runtimeDataStream) + return r.deleteDataStreamDocs(ctx, r.runtimeDataStream) } func (r *runner) run(ctx context.Context) (report reporters.Reportable, err error) { @@ -443,7 +446,7 @@ func (r *runner) run(ctx context.Context) (report reporters.Reportable, err erro return nil, fmt.Errorf("can't summarize metrics: %w", err) } - if err := r.reindexData(); err != nil { + if err := r.reindexData(ctx); err != nil { return nil, fmt.Errorf("can't reindex data: %w", err) } @@ -529,9 +532,11 @@ func (r *runner) collectAndSummarizeMetrics() (*metricsSummary, error) { return sum, err } -func (r *runner) deleteDataStreamDocs(dataStream string) error { +func (r *runner) deleteDataStreamDocs(ctx context.Context, dataStream string) error { body := strings.NewReader(`{ "query": { "match_all": {} } }`) - resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body) + resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body, + r.options.ESAPI.DeleteByQuery.WithContext(ctx), + ) if err != nil { return fmt.Errorf("failed to delete data stream docs for data stream %s: %w", dataStream, err) } @@ -941,7 +946,7 @@ func (r *runner) runRally(ctx context.Context) ([]rallyStat, error) { } // reindexData will read all data generated during the benchmark and will reindex it to the metrisctore -func (r *runner) reindexData() error { +func (r *runner) reindexData(ctx context.Context) error { if !r.options.ReindexData { return nil } @@ -954,6 +959,7 @@ func (r *runner) reindexData() error { logger.Debug("getting orignal mappings...") // Get the mapping from the source data stream mappingRes, err := r.options.ESAPI.Indices.GetMapping( + r.options.ESAPI.Indices.GetMapping.WithContext(ctx), r.options.ESAPI.Indices.GetMapping.WithIndex(r.runtimeDataStream), ) if err != nil { @@ -999,6 +1005,7 @@ func (r *runner) reindexData() error { createRes, err := r.options.ESMetricsAPI.Indices.Create( indexName, + r.options.ESMetricsAPI.Indices.Create.WithContext(ctx), r.options.ESMetricsAPI.Indices.Create.WithBody(reader), ) if err != nil { @@ -1014,6 +1021,7 @@ func (r *runner) reindexData() error { logger.Debug("starting scrolling of events...") res, err := r.options.ESAPI.Search( + r.options.ESAPI.Search.WithContext(ctx), r.options.ESAPI.Search.WithIndex(r.runtimeDataStream), r.options.ESAPI.Search.WithBody(bodyReader), r.options.ESAPI.Search.WithScroll(time.Minute), @@ -1042,7 +1050,7 @@ func (r *runner) reindexData() error { break } - err := r.bulkMetrics(indexName, sr) + err := r.bulkMetrics(ctx, indexName, sr) if err != nil { return err } @@ -1063,7 +1071,7 @@ type searchResponse struct { } `json:"hits"` } -func (r *runner) bulkMetrics(indexName string, sr searchResponse) error { +func (r *runner) bulkMetrics(ctx context.Context, indexName string, sr searchResponse) error { var bulkBodyBuilder strings.Builder for _, hit := range sr.Hits { bulkBodyBuilder.WriteString(fmt.Sprintf("{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n", indexName, hit.ID)) @@ -1077,7 +1085,10 @@ func (r *runner) bulkMetrics(indexName string, sr searchResponse) error { logger.Debugf("bulk request of %d events...", len(sr.Hits)) - resp, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String())) + resp, err := r.options.ESMetricsAPI.Bulk( + strings.NewReader(bulkBodyBuilder.String()), + r.options.ESMetricsAPI.Bulk.WithContext(ctx), + ) if err != nil { return fmt.Errorf("error performing the bulk index request: %w", err) } @@ -1091,6 +1102,7 @@ func (r *runner) bulkMetrics(indexName string, sr searchResponse) error { } resp, err = r.options.ESAPI.Scroll( + r.options.ESAPI.Scroll.WithContext(ctx), r.options.ESAPI.Scroll.WithScrollID(sr.ScrollID), r.options.ESAPI.Scroll.WithScroll(time.Minute), ) diff --git a/internal/benchrunner/runners/stream/runner.go b/internal/benchrunner/runners/stream/runner.go index e7c33b0608..a372209479 100644 --- a/internal/benchrunner/runners/stream/runner.go +++ b/internal/benchrunner/runners/stream/runner.go @@ -199,7 +199,7 @@ func (r *runner) setUp(ctx context.Context) error { return nil } - if err := r.wipeDataStreamsOnSetup(); err != nil { + if err := r.wipeDataStreamsOnSetup(ctx); err != nil { return fmt.Errorf("error cleaning up old data in data streams: %w", err) } @@ -224,13 +224,13 @@ func (r *runner) setUp(ctx context.Context) error { return nil } -func (r *runner) wipeDataStreamsOnSetup() error { +func (r *runner) wipeDataStreamsOnSetup(ctx context.Context) error { // Delete old data logger.Debug("deleting old data in data stream...") r.wipeDataStreamHandler = func(ctx context.Context) error { logger.Debugf("deleting data in data stream...") for _, runtimeDataStream := range r.runtimeDataStreams { - if err := r.deleteDataStreamDocs(runtimeDataStream); err != nil { + if err := r.deleteDataStreamDocs(ctx, runtimeDataStream); err != nil { return fmt.Errorf("error deleting data in data stream: %w", err) } } @@ -238,7 +238,7 @@ func (r *runner) wipeDataStreamsOnSetup() error { } for _, runtimeDataStream := range r.runtimeDataStreams { - if err := r.deleteDataStreamDocs(runtimeDataStream); err != nil { + if err := r.deleteDataStreamDocs(ctx, runtimeDataStream); err != nil { return fmt.Errorf("error deleting data in data stream: %w", err) } } @@ -278,9 +278,11 @@ func (r *runner) installPackageFromPackageRoot(ctx context.Context) error { return nil } -func (r *runner) deleteDataStreamDocs(dataStream string) error { +func (r *runner) deleteDataStreamDocs(ctx context.Context, dataStream string) error { body := strings.NewReader(`{ "query": { "match_all": {} } }`) - resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body) + resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body, + r.options.ESAPI.DeleteByQuery.WithContext(ctx), + ) if err != nil { return fmt.Errorf("failed to delete data stream docs for data stream %s: %w", dataStream, err) } @@ -484,8 +486,11 @@ func (r *runner) collectBulkRequestBody(indexName, scenarioName string, buf *byt return bulkBodyBuilder, nil } -func (r *runner) performBulkRequest(bulkRequest string) error { - resp, err := r.options.ESAPI.Bulk(strings.NewReader(bulkRequest)) +func (r *runner) performBulkRequest(ctx context.Context, bulkRequest string) error { + resp, err := r.options.ESAPI.Bulk(strings.NewReader(bulkRequest), + r.options.ESAPI.Bulk.WithContext(ctx), + ) + if err != nil { return err } @@ -595,7 +600,7 @@ func (r *runner) runStreamGenerator(ctx context.Context, scenarioName string) er } } - err := r.performBulkRequest(bulkBodyBuilder.String()) + err := r.performBulkRequest(ctx, bulkBodyBuilder.String()) if err != nil { return fmt.Errorf("error performing bulk request: %w", err) } @@ -628,7 +633,7 @@ func (r *runner) runBackfillGenerator(ctx context.Context, scenarioName string) } } - return r.performBulkRequest(bulkBodyBuilder.String()) + return r.performBulkRequest(ctx, bulkBodyBuilder.String()) } type benchMeta struct { diff --git a/internal/benchrunner/runners/system/runner.go b/internal/benchrunner/runners/system/runner.go index 067f4c7eb9..05070f4a48 100644 --- a/internal/benchrunner/runners/system/runner.go +++ b/internal/benchrunner/runners/system/runner.go @@ -204,13 +204,13 @@ func (r *runner) setUp(ctx context.Context) error { r.wipeDataStreamHandler = func(ctx context.Context) error { logger.Debugf("deleting data in data stream...") - if err := r.deleteDataStreamDocs(r.runtimeDataStream); err != nil { + if err := r.deleteDataStreamDocs(ctx, r.runtimeDataStream); err != nil { return fmt.Errorf("error deleting data in data stream: %w", err) } return nil } - if err := r.deleteDataStreamDocs(r.runtimeDataStream); err != nil { + if err := r.deleteDataStreamDocs(ctx, r.runtimeDataStream); err != nil { return fmt.Errorf("error deleting old data in data stream: %s: %w", r.runtimeDataStream, err) } @@ -276,7 +276,7 @@ func (r *runner) run(ctx context.Context) (report reporters.Reportable, err erro return nil, fmt.Errorf("can't summarize metrics: %w", err) } - if err := r.reindexData(); err != nil { + if err := r.reindexData(ctx); err != nil { return nil, fmt.Errorf("can't reindex data: %w", err) } @@ -346,9 +346,11 @@ func (r *runner) collectAndSummarizeMetrics() (*metricsSummary, error) { return sum, err } -func (r *runner) deleteDataStreamDocs(dataStream string) error { +func (r *runner) deleteDataStreamDocs(ctx context.Context, dataStream string) error { body := strings.NewReader(`{ "query": { "match_all": {} } }`) - resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body) + resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body, + r.options.ESAPI.DeleteByQuery.WithContext(ctx), + ) if err != nil { return fmt.Errorf("failed to delete docs for data stream %s: %w", dataStream, err) } @@ -736,7 +738,7 @@ func (r *runner) enrollAgents(ctx context.Context) error { } // reindexData will read all data generated during the benchmark and will reindex it to the metrisctore -func (r *runner) reindexData() error { +func (r *runner) reindexData(ctx context.Context) error { if !r.options.ReindexData { return nil } @@ -750,6 +752,7 @@ func (r *runner) reindexData() error { // Get the mapping from the source data stream mappingRes, err := r.options.ESAPI.Indices.GetMapping( r.options.ESAPI.Indices.GetMapping.WithIndex(r.runtimeDataStream), + r.options.ESAPI.Indices.GetMapping.WithContext(ctx), ) if err != nil { return fmt.Errorf("error getting mapping: %w", err) @@ -795,6 +798,7 @@ func (r *runner) reindexData() error { createRes, err := r.options.ESMetricsAPI.Indices.Create( indexName, r.options.ESMetricsAPI.Indices.Create.WithBody(reader), + r.options.ESMetricsAPI.Indices.Create.WithContext(ctx), ) if err != nil { return fmt.Errorf("could not create index: %w", err) @@ -813,6 +817,7 @@ func (r *runner) reindexData() error { r.options.ESAPI.Search.WithBody(bodyReader), r.options.ESAPI.Search.WithScroll(time.Minute), r.options.ESAPI.Search.WithSize(10000), + r.options.ESAPI.Search.WithContext(ctx), ) if err != nil { return fmt.Errorf("error executing search: %w", err) @@ -838,7 +843,7 @@ func (r *runner) reindexData() error { break } - err := r.bulkMetrics(indexName, sr) + err := r.bulkMetrics(ctx, indexName, sr) if err != nil { return err } @@ -859,7 +864,7 @@ type searchResponse struct { } `json:"hits"` } -func (r *runner) bulkMetrics(indexName string, sr searchResponse) error { +func (r *runner) bulkMetrics(ctx context.Context, indexName string, sr searchResponse) error { var bulkBodyBuilder strings.Builder for _, hit := range sr.Hits { bulkBodyBuilder.WriteString(fmt.Sprintf("{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n", indexName, hit.ID)) @@ -873,7 +878,10 @@ func (r *runner) bulkMetrics(indexName string, sr searchResponse) error { logger.Debugf("bulk request of %d events...", len(sr.Hits)) - resp, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String())) + resp, err := r.options.ESMetricsAPI.Bulk( + strings.NewReader(bulkBodyBuilder.String()), + r.options.ESMetricsAPI.Bulk.WithContext(ctx), + ) if err != nil { return fmt.Errorf("error performing the bulk index request: %w", err) } @@ -889,6 +897,7 @@ func (r *runner) bulkMetrics(indexName string, sr searchResponse) error { resp, err = r.options.ESAPI.Scroll( r.options.ESAPI.Scroll.WithScrollID(sr.ScrollID), r.options.ESAPI.Scroll.WithScroll(time.Minute), + r.options.ESAPI.Scroll.WithContext(ctx), ) if err != nil { return fmt.Errorf("error executing scroll: %s", err) diff --git a/internal/elasticsearch/ingest/pipeline.go b/internal/elasticsearch/ingest/pipeline.go index 16fde116b7..8f1ecbae37 100644 --- a/internal/elasticsearch/ingest/pipeline.go +++ b/internal/elasticsearch/ingest/pipeline.go @@ -6,6 +6,7 @@ package ingest import ( "bytes" + "context" "encoding/json" "fmt" "io" @@ -72,7 +73,7 @@ func (p *Pipeline) MarshalJSON() (asJSON []byte, err error) { return asJSON, nil } -func SimulatePipeline(api *elasticsearch.API, pipelineName string, events []json.RawMessage, simulateDataStream string) ([]json.RawMessage, error) { +func SimulatePipeline(ctx context.Context, api *elasticsearch.API, pipelineName string, events []json.RawMessage, simulateDataStream string) ([]json.RawMessage, error) { var request simulatePipelineRequest for _, event := range events { request.Docs = append(request.Docs, pipelineDocument{ @@ -86,7 +87,7 @@ func SimulatePipeline(api *elasticsearch.API, pipelineName string, events []json return nil, fmt.Errorf("marshalling simulate request failed: %w", err) } - r, err := api.Ingest.Simulate(bytes.NewReader(requestBody), func(request *elasticsearch.IngestSimulateRequest) { + r, err := api.Ingest.Simulate(bytes.NewReader(requestBody), api.Ingest.Simulate.WithContext(ctx), func(request *elasticsearch.IngestSimulateRequest) { request.PipelineID = pipelineName }) if err != nil { @@ -116,9 +117,9 @@ func SimulatePipeline(api *elasticsearch.API, pipelineName string, events []json return processedEvents, nil } -func UninstallPipelines(api *elasticsearch.API, pipelines []Pipeline) error { +func UninstallPipelines(ctx context.Context, api *elasticsearch.API, pipelines []Pipeline) error { for _, p := range pipelines { - err := uninstallPipeline(api, p.Name) + err := uninstallPipeline(ctx, api, p.Name) if err != nil { return err } @@ -126,8 +127,8 @@ func UninstallPipelines(api *elasticsearch.API, pipelines []Pipeline) error { return nil } -func uninstallPipeline(api *elasticsearch.API, name string) error { - resp, err := api.Ingest.DeletePipeline(name) +func uninstallPipeline(ctx context.Context, api *elasticsearch.API, name string) error { + resp, err := api.Ingest.DeletePipeline(name, api.Ingest.DeletePipeline.WithContext(ctx)) if err != nil { return fmt.Errorf("delete pipeline API call failed (pipelineName: %s): %w", name, err) } diff --git a/internal/testrunner/runners/pipeline/runner.go b/internal/testrunner/runners/pipeline/runner.go index 89c28714cb..f7f85f68ce 100644 --- a/internal/testrunner/runners/pipeline/runner.go +++ b/internal/testrunner/runners/pipeline/runner.go @@ -101,7 +101,7 @@ func (r *runner) TearDown(ctx context.Context) error { } } - if err := ingest.UninstallPipelines(r.options.API, r.pipelines); err != nil { + if err := ingest.UninstallPipelines(ctx, r.options.API, r.pipelines); err != nil { return fmt.Errorf("uninstalling ingest pipelines failed: %w", err) } return nil @@ -187,7 +187,7 @@ func (r *runner) run(ctx context.Context) ([]testrunner.TestResult, error) { fields.WithExpectedDatasets(expectedDatasets), fields.WithEnabledImportAllECSSChema(true), } - result, err := r.runTestCase(testCaseFile, dataStreamPath, dsManifest.Type, entryPipeline, validatorOptions) + result, err := r.runTestCase(ctx, testCaseFile, dataStreamPath, dsManifest.Type, entryPipeline, validatorOptions) if err != nil { return nil, err } @@ -259,7 +259,7 @@ func (r *runner) checkElasticsearchLogs(ctx context.Context, startTesting time.T } -func (r *runner) runTestCase(testCaseFile string, dsPath string, dsType string, pipeline string, validatorOptions []fields.ValidatorOption) (testrunner.TestResult, error) { +func (r *runner) runTestCase(ctx context.Context, testCaseFile string, dsPath string, dsType string, pipeline string, validatorOptions []fields.ValidatorOption) (testrunner.TestResult, error) { tr := testrunner.TestResult{ TestType: TestType, Package: r.options.TestFolder.Package, @@ -285,7 +285,7 @@ func (r *runner) runTestCase(testCaseFile string, dsPath string, dsType string, } simulateDataStream := dsType + "-" + r.options.TestFolder.Package + "." + r.options.TestFolder.DataStream + "-default" - processedEvents, err := ingest.SimulatePipeline(r.options.API, pipeline, tc.events, simulateDataStream) + processedEvents, err := ingest.SimulatePipeline(ctx, r.options.API, pipeline, tc.events, simulateDataStream) if err != nil { err := fmt.Errorf("simulating pipeline processing failed: %w", err) tr.ErrorMsg = err.Error() From 4d30f7396d9ebb821e08efce0af8d910942c12a8 Mon Sep 17 00:00:00 2001 From: Mario Rodriguez Molins Date: Thu, 18 Apr 2024 11:42:17 +0200 Subject: [PATCH 2/5] Reorder calls --- internal/benchrunner/runners/system/runner.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/benchrunner/runners/system/runner.go b/internal/benchrunner/runners/system/runner.go index 05070f4a48..bec04e99f8 100644 --- a/internal/benchrunner/runners/system/runner.go +++ b/internal/benchrunner/runners/system/runner.go @@ -751,8 +751,8 @@ func (r *runner) reindexData(ctx context.Context) error { logger.Debug("getting orignal mappings...") // Get the mapping from the source data stream mappingRes, err := r.options.ESAPI.Indices.GetMapping( - r.options.ESAPI.Indices.GetMapping.WithIndex(r.runtimeDataStream), r.options.ESAPI.Indices.GetMapping.WithContext(ctx), + r.options.ESAPI.Indices.GetMapping.WithIndex(r.runtimeDataStream), ) if err != nil { return fmt.Errorf("error getting mapping: %w", err) @@ -797,8 +797,8 @@ func (r *runner) reindexData(ctx context.Context) error { createRes, err := r.options.ESMetricsAPI.Indices.Create( indexName, - r.options.ESMetricsAPI.Indices.Create.WithBody(reader), r.options.ESMetricsAPI.Indices.Create.WithContext(ctx), + r.options.ESMetricsAPI.Indices.Create.WithBody(reader), ) if err != nil { return fmt.Errorf("could not create index: %w", err) @@ -813,11 +813,11 @@ func (r *runner) reindexData(ctx context.Context) error { logger.Debug("starting scrolling of events...") resp, err := r.options.ESAPI.Search( + r.options.ESAPI.Search.WithContext(ctx), r.options.ESAPI.Search.WithIndex(r.runtimeDataStream), r.options.ESAPI.Search.WithBody(bodyReader), r.options.ESAPI.Search.WithScroll(time.Minute), r.options.ESAPI.Search.WithSize(10000), - r.options.ESAPI.Search.WithContext(ctx), ) if err != nil { return fmt.Errorf("error executing search: %w", err) @@ -895,9 +895,9 @@ func (r *runner) bulkMetrics(ctx context.Context, indexName string, sr searchRes } resp, err = r.options.ESAPI.Scroll( + r.options.ESAPI.Scroll.WithContext(ctx), r.options.ESAPI.Scroll.WithScrollID(sr.ScrollID), r.options.ESAPI.Scroll.WithScroll(time.Minute), - r.options.ESAPI.Scroll.WithContext(ctx), ) if err != nil { return fmt.Errorf("error executing scroll: %s", err) From 6319c3bf7069ad0852a77122782ea9835dfa4f49 Mon Sep 17 00:00:00 2001 From: Mario Rodriguez Molins Date: Thu, 18 Apr 2024 13:16:13 +0200 Subject: [PATCH 3/5] Keep the mandatory argument in the same line as the call Co-authored-by: Jaime Soriano Pastor --- internal/benchrunner/runners/rally/runner.go | 3 +-- internal/benchrunner/runners/system/runner.go | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/internal/benchrunner/runners/rally/runner.go b/internal/benchrunner/runners/rally/runner.go index fb5af8a50f..e4b9e3799a 100644 --- a/internal/benchrunner/runners/rally/runner.go +++ b/internal/benchrunner/runners/rally/runner.go @@ -1085,8 +1085,7 @@ func (r *runner) bulkMetrics(ctx context.Context, indexName string, sr searchRes logger.Debugf("bulk request of %d events...", len(sr.Hits)) - resp, err := r.options.ESMetricsAPI.Bulk( - strings.NewReader(bulkBodyBuilder.String()), + resp, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String()), r.options.ESMetricsAPI.Bulk.WithContext(ctx), ) if err != nil { diff --git a/internal/benchrunner/runners/system/runner.go b/internal/benchrunner/runners/system/runner.go index bec04e99f8..d5d49fe723 100644 --- a/internal/benchrunner/runners/system/runner.go +++ b/internal/benchrunner/runners/system/runner.go @@ -878,8 +878,7 @@ func (r *runner) bulkMetrics(ctx context.Context, indexName string, sr searchRes logger.Debugf("bulk request of %d events...", len(sr.Hits)) - resp, err := r.options.ESMetricsAPI.Bulk( - strings.NewReader(bulkBodyBuilder.String()), + resp, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String()), r.options.ESMetricsAPI.Bulk.WithContext(ctx), ) if err != nil { From 25db91be228a7f9cf3ff5d4ccce9459efcae555a Mon Sep 17 00:00:00 2001 From: Mario Rodriguez Molins Date: Thu, 18 Apr 2024 13:17:48 +0200 Subject: [PATCH 4/5] Set pipeline ID for simulate call with function Co-authored-by: Jaime Soriano Pastor --- internal/elasticsearch/ingest/pipeline.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/elasticsearch/ingest/pipeline.go b/internal/elasticsearch/ingest/pipeline.go index 8f1ecbae37..1c2983cf9e 100644 --- a/internal/elasticsearch/ingest/pipeline.go +++ b/internal/elasticsearch/ingest/pipeline.go @@ -87,8 +87,9 @@ func SimulatePipeline(ctx context.Context, api *elasticsearch.API, pipelineName return nil, fmt.Errorf("marshalling simulate request failed: %w", err) } - r, err := api.Ingest.Simulate(bytes.NewReader(requestBody), api.Ingest.Simulate.WithContext(ctx), func(request *elasticsearch.IngestSimulateRequest) { - request.PipelineID = pipelineName + r, err := api.Ingest.Simulate(bytes.NewReader(requestBody), + api.Ingest.Simulate.WithContext(ctx), + api.Ingest.Simulate.WithPipelineID(pipelineName), }) if err != nil { return nil, fmt.Errorf("simulate API call failed (pipelineName: %s): %w", pipelineName, err) From 386dae36a431d9af881e300b6ce3c6311c70a3a2 Mon Sep 17 00:00:00 2001 From: Mario Rodriguez Molins Date: Thu, 18 Apr 2024 13:24:50 +0200 Subject: [PATCH 5/5] Remove extra brace --- internal/elasticsearch/ingest/pipeline.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/elasticsearch/ingest/pipeline.go b/internal/elasticsearch/ingest/pipeline.go index 1c2983cf9e..660717c80c 100644 --- a/internal/elasticsearch/ingest/pipeline.go +++ b/internal/elasticsearch/ingest/pipeline.go @@ -90,7 +90,7 @@ func SimulatePipeline(ctx context.Context, api *elasticsearch.API, pipelineName r, err := api.Ingest.Simulate(bytes.NewReader(requestBody), api.Ingest.Simulate.WithContext(ctx), api.Ingest.Simulate.WithPipelineID(pipelineName), - }) + ) if err != nil { return nil, fmt.Errorf("simulate API call failed (pipelineName: %s): %w", pipelineName, err) }