Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics engine tests #2878

Merged
merged 2 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/v1/group_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func getControlSurface(tb testing.TB, testState *lib.TestRunState) *ControlSurfa
execScheduler, err := execution.NewScheduler(testState)
require.NoError(tb, err)

me, err := engine.NewMetricsEngine(execScheduler.GetState())
me, err := engine.NewMetricsEngine(testState)
require.NoError(tb, err)

ctx, cancel := context.WithCancel(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion api/v1/setup_teardown_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestSetupData(t *testing.T) {

execScheduler, err := execution.NewScheduler(testState)
require.NoError(t, err)
metricsEngine, err := engine.NewMetricsEngine(execScheduler.GetState())
metricsEngine, err := engine.NewMetricsEngine(testState)
require.NoError(t, err)

globalCtx, globalCancel := context.WithCancel(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion api/v1/status_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestPatchStatus(t *testing.T) {
execScheduler, err := execution.NewScheduler(testState)
require.NoError(t, err)

metricsEngine, err := engine.NewMetricsEngine(execScheduler.GetState())
metricsEngine, err := engine.NewMetricsEngine(testState)
require.NoError(t, err)

globalCtx, globalCancel := context.WithCancel(context.Background())
Expand Down
4 changes: 2 additions & 2 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
}

executionState := execScheduler.GetState()
metricsEngine, err := engine.NewMetricsEngine(executionState)
metricsEngine, err := engine.NewMetricsEngine(executionState.Test)
if err != nil {
return err
}
Expand Down Expand Up @@ -178,7 +178,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
}()

if !testRunState.RuntimeOptions.NoThresholds.Bool { //nolint:nestif
finalizeThresholds := metricsEngine.StartThresholdCalculations(runAbort)
finalizeThresholds := metricsEngine.StartThresholdCalculations(runAbort, executionState.GetCurrentTestRunDuration)
defer func() {
if finalizeThresholds == nil {
return
Expand Down
4 changes: 2 additions & 2 deletions js/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func TestDataIsolation(t *testing.T) {
execScheduler, err := execution.NewScheduler(testRunState)
require.NoError(t, err)

metricsEngine, err := engine.NewMetricsEngine(execScheduler.GetState())
metricsEngine, err := engine.NewMetricsEngine(testRunState)
require.NoError(t, err)

globalCtx, globalCancel := context.WithCancel(context.Background())
Expand All @@ -401,7 +401,7 @@ func TestDataIsolation(t *testing.T) {
require.NoError(t, err)
defer stopOutputs(nil)

finalizeThresholds := metricsEngine.StartThresholdCalculations(runAbort)
finalizeThresholds := metricsEngine.StartThresholdCalculations(runAbort, execScheduler.GetState().GetCurrentTestRunDuration)
require.Nil(t, finalizeThresholds)

require.Empty(t, runner.defaultGroup.Groups)
Expand Down
40 changes: 22 additions & 18 deletions metrics/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ const thresholdsRate = 2 * time.Second
// aggregated metric sample values. They are used to generate the end-of-test
// summary and to evaluate the test thresholds.
type MetricsEngine struct {
es *lib.ExecutionState
logger logrus.FieldLogger

logger logrus.FieldLogger
test *lib.TestRunState
outputIngester *outputIngester

// These can be both top-level metrics or sub-metrics
Expand All @@ -45,15 +44,14 @@ type MetricsEngine struct {
}

// NewMetricsEngine creates a new metrics Engine with the given parameters.
func NewMetricsEngine(es *lib.ExecutionState) (*MetricsEngine, error) {
func NewMetricsEngine(runState *lib.TestRunState) (*MetricsEngine, error) {
me := &MetricsEngine{
es: es,
logger: es.Test.Logger.WithField("component", "metrics-engine"),

test: runState,
logger: runState.Logger.WithField("component", "metrics-engine"),
ObservedMetrics: make(map[string]*metrics.Metric),
}

if !(me.es.Test.RuntimeOptions.NoSummary.Bool && me.es.Test.RuntimeOptions.NoThresholds.Bool) {
if !(me.test.RuntimeOptions.NoSummary.Bool && me.test.RuntimeOptions.NoThresholds.Bool) {
err := me.initSubMetricsAndThresholds()
if err != nil {
return nil, err
Expand All @@ -77,10 +75,11 @@ func (me *MetricsEngine) getThresholdMetricOrSubmetric(name string) (*metrics.Me
// TODO: replace with strings.Cut after Go 1.18
nameParts := strings.SplitN(name, "{", 2)

metric := me.es.Test.Registry.Get(nameParts[0])
metric := me.test.Registry.Get(nameParts[0])
if metric == nil {
return nil, fmt.Errorf("metric '%s' does not exist in the script", nameParts[0])
}

if len(nameParts) == 1 { // no sub-metric
return metric, nil
}
Expand Down Expand Up @@ -126,10 +125,10 @@ func (me *MetricsEngine) markObserved(metric *metrics.Metric) {
}

func (me *MetricsEngine) initSubMetricsAndThresholds() error {
for metricName, thresholds := range me.es.Test.Options.Thresholds {
for metricName, thresholds := range me.test.Options.Thresholds {
metric, err := me.getThresholdMetricOrSubmetric(metricName)

if me.es.Test.RuntimeOptions.NoThresholds.Bool {
if me.test.RuntimeOptions.NoThresholds.Bool {
if err != nil {
me.logger.WithError(err).Warnf("Invalid metric '%s' in threshold definitions", metricName)
}
Expand All @@ -154,7 +153,7 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error {

// TODO: refactor out of here when https://github.com/grafana/k6/issues/1321
// lands and there is a better way to enable a metric with tag
if me.es.Test.Options.SystemTags.Has(metrics.TagExpectedResponse) {
if me.test.Options.SystemTags.Has(metrics.TagExpectedResponse) {
_, err := me.getThresholdMetricOrSubmetric("http_req_duration{expected_response:true}")
if err != nil {
return err // shouldn't happen, but ¯\_(ツ)_/¯
Expand All @@ -166,8 +165,10 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error {

// StartThresholdCalculations spins up a new goroutine to crunch thresholds and
// returns a callback that will stop the goroutine and finalizes calculations.
func (me *MetricsEngine) StartThresholdCalculations(abortRun func(error)) (
finalize func() (breached []string),
func (me *MetricsEngine) StartThresholdCalculations(
abortRun func(error),
getCurrentTestRunDuration func() time.Duration,
) (finalize func() (breached []string),
) {
if len(me.metricsWithThresholds) == 0 {
return nil // no thresholds were defined
Expand All @@ -184,7 +185,7 @@ func (me *MetricsEngine) StartThresholdCalculations(abortRun func(error)) (
for {
select {
case <-ticker.C:
breached, shouldAbort := me.evaluateThresholds(true)
breached, shouldAbort := me.evaluateThresholds(true, getCurrentTestRunDuration)
if shouldAbort {
err := fmt.Errorf(
"thresholds on metrics '%s' were breached; at least one has abortOnFail enabled, stopping test prematurely",
Expand Down Expand Up @@ -213,19 +214,22 @@ func (me *MetricsEngine) StartThresholdCalculations(abortRun func(error)) (
close(stop)
<-done

breached, _ := me.evaluateThresholds(false)
breached, _ := me.evaluateThresholds(false, getCurrentTestRunDuration)
return breached
}
}

// evaluateThresholds processes all of the thresholds.
//
// TODO: refactor, optimize
func (me *MetricsEngine) evaluateThresholds(ignoreEmptySinks bool) (breachedThresholds []string, shouldAbort bool) {
func (me *MetricsEngine) evaluateThresholds(
ignoreEmptySinks bool,
getCurrentTestRunDuration func() time.Duration,
) (breachedThresholds []string, shouldAbort bool) {
me.MetricsLock.Lock()
defer me.MetricsLock.Unlock()

t := me.es.GetCurrentTestRunDuration()
t := getCurrentTestRunDuration()

me.logger.Debugf("Running thresholds on %d metrics...", len(me.metricsWithThresholds))
for _, m := range me.metricsWithThresholds {
Expand Down
Loading