forked from grafana/k6
/
constant_arrival_rate.go
374 lines (326 loc) · 13.1 KB
/
constant_arrival_rate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
/*
*
* k6 - a next-generation load testing tool
* Copyright (C) 2019 Load Impact
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package executor
import (
"context"
"fmt"
"math"
"math/big"
"sync"
"sync/atomic"
"time"
"github.com/sirupsen/logrus"
"gopkg.in/guregu/null.v3"
"github.com/loadimpact/k6/lib"
"github.com/loadimpact/k6/lib/metrics"
"github.com/loadimpact/k6/lib/types"
"github.com/loadimpact/k6/stats"
"github.com/loadimpact/k6/ui/pb"
)
const constantArrivalRateType = "constant-arrival-rate"
func init() {
lib.RegisterExecutorConfigType(
constantArrivalRateType,
func(name string, rawJSON []byte) (lib.ExecutorConfig, error) {
config := NewConstantArrivalRateConfig(name)
err := lib.StrictJSONUnmarshal(rawJSON, &config)
return config, err
},
)
}
// ConstantArrivalRateConfig stores config for the constant arrival-rate executor
type ConstantArrivalRateConfig struct {
BaseConfig
Rate null.Int `json:"rate"`
TimeUnit types.NullDuration `json:"timeUnit"`
Duration types.NullDuration `json:"duration"`
// Initialize `PreAllocatedVUs` number of VUs, and if more than that are needed,
// they will be dynamically allocated, until `MaxVUs` is reached, which is an
// absolutely hard limit on the number of VUs the executor will use
PreAllocatedVUs null.Int `json:"preAllocatedVUs"`
MaxVUs null.Int `json:"maxVUs"`
}
// NewConstantArrivalRateConfig returns a ConstantArrivalRateConfig with default values
func NewConstantArrivalRateConfig(name string) *ConstantArrivalRateConfig {
return &ConstantArrivalRateConfig{
BaseConfig: NewBaseConfig(name, constantArrivalRateType),
TimeUnit: types.NewNullDuration(1*time.Second, false),
}
}
// Make sure we implement the lib.ExecutorConfig interface
var _ lib.ExecutorConfig = &ConstantArrivalRateConfig{}
// GetPreAllocatedVUs is just a helper method that returns the scaled pre-allocated VUs.
func (carc ConstantArrivalRateConfig) GetPreAllocatedVUs(et *lib.ExecutionTuple) int64 {
return et.ScaleInt64(carc.PreAllocatedVUs.Int64)
}
// GetMaxVUs is just a helper method that returns the scaled max VUs.
func (carc ConstantArrivalRateConfig) GetMaxVUs(et *lib.ExecutionTuple) int64 {
return et.ScaleInt64(carc.MaxVUs.Int64)
}
// GetDescription returns a human-readable description of the executor options
func (carc ConstantArrivalRateConfig) GetDescription(et *lib.ExecutionTuple) string {
preAllocatedVUs, maxVUs := carc.GetPreAllocatedVUs(et), carc.GetMaxVUs(et)
maxVUsRange := fmt.Sprintf("maxVUs: %d", preAllocatedVUs)
if maxVUs > preAllocatedVUs {
maxVUsRange += fmt.Sprintf("-%d", maxVUs)
}
timeUnit := time.Duration(carc.TimeUnit.Duration)
var arrRatePerSec float64
if maxVUs != 0 { // TODO: do something better?
ratio := big.NewRat(maxVUs, carc.MaxVUs.Int64)
arrRate := big.NewRat(carc.Rate.Int64, int64(timeUnit))
arrRate.Mul(arrRate, ratio)
arrRatePerSec, _ = getArrivalRatePerSec(arrRate).Float64()
}
return fmt.Sprintf("%.2f iterations/s for %s%s", arrRatePerSec, carc.Duration.Duration,
carc.getBaseInfo(maxVUsRange))
}
// Validate makes sure all options are configured and valid
func (carc *ConstantArrivalRateConfig) Validate() []error {
errors := carc.BaseConfig.Validate()
if !carc.Rate.Valid {
errors = append(errors, fmt.Errorf("the iteration rate isn't specified"))
} else if carc.Rate.Int64 <= 0 {
errors = append(errors, fmt.Errorf("the iteration rate should be more than 0"))
}
if time.Duration(carc.TimeUnit.Duration) <= 0 {
errors = append(errors, fmt.Errorf("the timeUnit should be more than 0"))
}
if !carc.Duration.Valid {
errors = append(errors, fmt.Errorf("the duration is unspecified"))
} else if time.Duration(carc.Duration.Duration) < minDuration {
errors = append(errors, fmt.Errorf(
"the duration should be at least %s, but is %s", minDuration, carc.Duration,
))
}
if !carc.PreAllocatedVUs.Valid {
errors = append(errors, fmt.Errorf("the number of preAllocatedVUs isn't specified"))
} else if carc.PreAllocatedVUs.Int64 < 0 {
errors = append(errors, fmt.Errorf("the number of preAllocatedVUs shouldn't be negative"))
}
if !carc.MaxVUs.Valid {
// TODO: don't change the config while validating
carc.MaxVUs.Int64 = carc.PreAllocatedVUs.Int64
} else if carc.MaxVUs.Int64 < carc.PreAllocatedVUs.Int64 {
errors = append(errors, fmt.Errorf("maxVUs shouldn't be less than preAllocatedVUs"))
}
return errors
}
// GetExecutionRequirements returns the number of required VUs to run the
// executor for its whole duration (disregarding any startTime), including the
// maximum waiting time for any iterations to gracefully stop. This is used by
// the execution scheduler in its VU reservation calculations, so it knows how
// many VUs to pre-initialize.
func (carc ConstantArrivalRateConfig) GetExecutionRequirements(et *lib.ExecutionTuple) []lib.ExecutionStep {
return []lib.ExecutionStep{
{
TimeOffset: 0,
PlannedVUs: uint64(et.ScaleInt64(carc.PreAllocatedVUs.Int64)),
MaxUnplannedVUs: uint64(et.ScaleInt64(carc.MaxVUs.Int64) - et.ScaleInt64(carc.PreAllocatedVUs.Int64)),
}, {
TimeOffset: time.Duration(carc.Duration.Duration + carc.GracefulStop.Duration),
PlannedVUs: 0,
MaxUnplannedVUs: 0,
},
}
}
// NewExecutor creates a new ConstantArrivalRate executor
func (carc ConstantArrivalRateConfig) NewExecutor(
es *lib.ExecutionState, logger *logrus.Entry,
) (lib.Executor, error) {
return &ConstantArrivalRate{
BaseExecutor: NewBaseExecutor(&carc, es, logger),
config: carc,
}, nil
}
// HasWork reports whether there is any work to be done for the given execution segment.
func (carc ConstantArrivalRateConfig) HasWork(et *lib.ExecutionTuple) bool {
return carc.GetMaxVUs(et) > 0
}
// ConstantArrivalRate tries to execute a specific number of iterations for a
// specific period.
type ConstantArrivalRate struct {
*BaseExecutor
config ConstantArrivalRateConfig
et *lib.ExecutionTuple
}
// Make sure we implement the lib.Executor interface.
var _ lib.Executor = &ConstantArrivalRate{}
// Init values needed for the execution
func (car *ConstantArrivalRate) Init(ctx context.Context) error {
// err should always be nil, because Init() won't be called for executors
// with no work, as determined by their config's HasWork() method.
et, err := car.BaseExecutor.executionState.ExecutionTuple.GetNewExecutionTupleFromValue(car.config.MaxVUs.Int64)
car.et = et
return err
}
// Run executes a constant number of iterations per second.
//
// TODO: Split this up and make an independent component that can be reused
// between the constant and ramping arrival rate executors - that way we can
// keep the complexity in one well-architected part (with short methods and few
// lambdas :D), while having both config frontends still be present for maximum
// UX benefits. Basically, keep the progress bars and scheduling (i.e. at what
// time should iteration X begin) different, but keep everything else the same.
// This will allow us to implement https://github.com/loadimpact/k6/issues/1386
// and things like all of the TODOs below in one place only.
//nolint:funlen
func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- stats.SampleContainer) (err error) {
gracefulStop := car.config.GetGracefulStop()
duration := time.Duration(car.config.Duration.Duration)
preAllocatedVUs := car.config.GetPreAllocatedVUs(car.executionState.ExecutionTuple)
maxVUs := car.config.GetMaxVUs(car.executionState.ExecutionTuple)
// TODO: refactor and simplify
arrivalRate := getScaledArrivalRate(car.et.Segment, car.config.Rate.Int64, time.Duration(car.config.TimeUnit.Duration))
tickerPeriod := time.Duration(getTickerPeriod(arrivalRate).Duration)
arrivalRatePerSec, _ := getArrivalRatePerSec(arrivalRate).Float64()
// Make sure the log and the progress bar have accurate information
car.logger.WithFields(logrus.Fields{
"maxVUs": maxVUs, "preAllocatedVUs": preAllocatedVUs, "duration": duration,
"tickerPeriod": tickerPeriod, "type": car.config.GetType(),
}).Debug("Starting executor run...")
activeVUsWg := &sync.WaitGroup{}
returnedVUs := make(chan struct{})
startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, duration, gracefulStop)
defer func() {
// Make sure all VUs aren't executing iterations anymore, for the cancel()
// below to deactivate them.
<-returnedVUs
cancel()
activeVUsWg.Wait()
}()
activeVUs := make(chan lib.ActiveVU, maxVUs)
activeVUsCount := uint64(0)
activationParams := getVUActivationParams(maxDurationCtx, car.config.BaseConfig,
func(u lib.InitializedVU) {
car.executionState.ReturnVU(u, true)
activeVUsWg.Done()
})
activateVU := func(initVU lib.InitializedVU) lib.ActiveVU {
activeVUsWg.Add(1)
activeVU := initVU.Activate(activationParams)
car.executionState.ModCurrentlyActiveVUsCount(+1)
atomic.AddUint64(&activeVUsCount, 1)
return activeVU
}
remainingUnplannedVUs := maxVUs - preAllocatedVUs
makeUnplannedVUCh := make(chan struct{})
defer close(makeUnplannedVUCh)
go func() {
defer close(returnedVUs)
defer func() {
// this is done here as to not have an unplannedVU in the middle of initialization when
// starting to return activeVUs
for i := uint64(0); i < atomic.LoadUint64(&activeVUsCount); i++ {
<-activeVUs
}
}()
for range makeUnplannedVUCh {
car.logger.Debug("Starting initialization of an unplanned VU...")
initVU, err := car.executionState.GetUnplannedVU(maxDurationCtx, car.logger)
if err != nil {
// TODO figure out how to return it to the Run goroutine
car.logger.WithError(err).Error("Error while allocating unplanned VU")
} else {
car.logger.Debug("The unplanned VU finished initializing successfully!")
activeVUs <- activateVU(initVU)
}
}
}()
// Get the pre-allocated VUs in the local buffer
for i := int64(0); i < preAllocatedVUs; i++ {
initVU, err := car.executionState.GetPlannedVU(car.logger, false)
if err != nil {
return err
}
activeVUs <- activateVU(initVU)
}
vusFmt := pb.GetFixedLengthIntFormat(maxVUs)
progIters := fmt.Sprintf(
pb.GetFixedLengthFloatFormat(arrivalRatePerSec, 0)+" iters/s", arrivalRatePerSec)
progressFn := func() (float64, []string) {
spent := time.Since(startTime)
currActiveVUs := atomic.LoadUint64(&activeVUsCount)
vusInBuffer := uint64(len(activeVUs))
progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs",
currActiveVUs-vusInBuffer, currActiveVUs)
right := []string{progVUs, duration.String(), progIters}
if spent > duration {
return 1, right
}
spentDuration := pb.GetFixedLengthDuration(spent, duration)
progDur := fmt.Sprintf("%s/%s", spentDuration, duration)
right[1] = progDur
return math.Min(1, float64(spent)/float64(duration)), right
}
car.progress.Modify(pb.WithProgress(progressFn))
go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, &car, progressFn)
runIterationBasic := getIterationRunner(car.executionState, car.logger)
runIteration := func(vu lib.ActiveVU) {
runIterationBasic(maxDurationCtx, vu)
activeVUs <- vu
}
start, offsets, _ := car.et.GetStripedOffsets()
timer := time.NewTimer(time.Hour * 24)
// here the we need the not scaled one
notScaledTickerPeriod := time.Duration(
getTickerPeriod(
big.NewRat(
car.config.Rate.Int64,
int64(time.Duration(car.config.TimeUnit.Duration)),
)).Duration)
shownWarning := false
metricTags := car.getMetricTags(nil)
for li, gi := 0, start; ; li, gi = li+1, gi+offsets[li%len(offsets)] {
t := notScaledTickerPeriod*time.Duration(gi) - time.Since(startTime)
timer.Reset(t)
select {
case <-timer.C:
select {
case vu := <-activeVUs: // ideally, we get the VU from the buffer without any issues
go runIteration(vu) //TODO: refactor so we dont spin up a goroutine for each iteration
continue
default: // no free VUs currently available
}
// Since there aren't any free VUs available, consider this iteration
// dropped - we aren't going to try to recover it, but
stats.PushIfNotDone(parentCtx, out, stats.Sample{
Value: 1, Metric: metrics.DroppedIterations,
Tags: metricTags, Time: time.Now(),
})
// We'll try to start allocating another VU in the background,
// non-blockingly, if we have remainingUnplannedVUs...
if remainingUnplannedVUs == 0 {
if !shownWarning {
car.logger.Warningf("Insufficient VUs, reached %d active VUs and cannot initialize more", maxVUs)
shownWarning = true
}
continue
}
select {
case makeUnplannedVUCh <- struct{}{}: // great!
remainingUnplannedVUs--
default: // we're already allocating a new VU
}
case <-regDurationCtx.Done():
return nil
}
}
}