From 918a39d6f1cc005d75995b79120a43268cdbdaf4 Mon Sep 17 00:00:00 2001 From: Nicolas Ruflin Date: Wed, 20 Dec 2023 16:41:35 +0100 Subject: [PATCH 1/2] Fix runner shutdown, improve error message There are two fixes in this PR. The first one is around the runner being stuck in case of errors. Because a channel is used for the errors, in case 2 errors happen in a row before the error is consumed, the go routine gets stuck. As the effect of an error is stopping the generation anyways, now on error the progress on function is aborted with return. As the errors are mostly used for logging, it raises the question if the channels are needed in the first place but further cleanup is out of scope of this PR. The second change is around error logging for scenarios where the template generates invalid JSON. For easier debugging, it now prints out the full invalid JSON event as a log event and gives an indication on where the problem lies. --- internal/benchrunner/runners/stream/runner.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/benchrunner/runners/stream/runner.go b/internal/benchrunner/runners/stream/runner.go index 5559e66768..46da235fa1 100644 --- a/internal/benchrunner/runners/stream/runner.go +++ b/internal/benchrunner/runners/stream/runner.go @@ -430,7 +430,9 @@ func (r *runner) collectBulkRequestBody(indexName, scenarioName string, buf *byt var event map[string]any err = json.Unmarshal(buf.Bytes(), &event) if err != nil { - return bulkBodyBuilder, err + return bulkBodyBuilder, fmt.Errorf("problem unmarshaling json event: %s\n"+ + "Outcome of template for scenario '%s' is invalid JSON. Check your benchmark template.", + buf.String(), scenarioName) } enriched := r.enrichEventWithBenchmarkMetadata(event) src, err := json.Marshal(enriched) @@ -505,6 +507,7 @@ func (r *runner) streamData() { if err != nil { r.errChanGenerators <- fmt.Errorf("error while generating event for streaming: %w", err) + return } } @@ -534,6 +537,7 @@ func (r *runner) streamData() { if err != nil { r.errChanGenerators <- fmt.Errorf("error while generating event for streaming: %w", err) + return } } From d3139e7ae25c6e673167436b96a4f20a3d25e52f Mon Sep 17 00:00:00 2001 From: Nicolas Ruflin Date: Thu, 21 Dec 2023 09:47:48 +0100 Subject: [PATCH 2/2] Update internal/benchrunner/runners/stream/runner.go Co-authored-by: Jaime Soriano Pastor --- internal/benchrunner/runners/stream/runner.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/internal/benchrunner/runners/stream/runner.go b/internal/benchrunner/runners/stream/runner.go index 46da235fa1..ef50cbe90a 100644 --- a/internal/benchrunner/runners/stream/runner.go +++ b/internal/benchrunner/runners/stream/runner.go @@ -430,9 +430,8 @@ func (r *runner) collectBulkRequestBody(indexName, scenarioName string, buf *byt var event map[string]any err = json.Unmarshal(buf.Bytes(), &event) if err != nil { - return bulkBodyBuilder, fmt.Errorf("problem unmarshaling json event: %s\n"+ - "Outcome of template for scenario '%s' is invalid JSON. Check your benchmark template.", - buf.String(), scenarioName) + logger.Debugf("Problem found when unmarshalling document: %s", buf.String()) + return bulkBodyBuilder, fmt.Errorf("failed to unmarshal json event, check your benchmark template for scenario %s: %w", scenarioName, err) } enriched := r.enrichEventWithBenchmarkMetadata(event) src, err := json.Marshal(enriched)