Skip to content

Commit

Permalink
Fix constant-arrival-rate executor hanging
Browse files Browse the repository at this point in the history
  • Loading branch information
na-- authored and Ivan Mirić committed Apr 24, 2020
1 parent b3b2a6e commit eb13303
Showing 1 changed file with 13 additions and 16 deletions.
29 changes: 13 additions & 16 deletions lib/executor/constant_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,6 @@ func (car ConstantArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC
tickerPeriod := time.Duration(getTickerPeriod(arrivalRate).Duration)
arrivalRatePerSec, _ := getArrivalRatePerSec(arrivalRate).Float64()

startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(ctx, duration, gracefulStop)
defer cancel()

// Make sure the log and the progress bar have accurate information
car.logger.WithFields(logrus.Fields{
"maxVUs": maxVUs, "preAllocatedVUs": preAllocatedVUs, "duration": duration,
Expand All @@ -222,12 +219,16 @@ func (car ConstantArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC

activeVUsWg := &sync.WaitGroup{}
defer activeVUsWg.Wait()
// Make sure we put planned and unplanned VUs back in the global buffer

startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(ctx, duration, gracefulStop)
defer cancel()

// Make sure all VUs aren't executing iterations anymore, for the cancel()
// above to deactivate them.
defer func() {
currActiveVUs := atomic.LoadUint64(&activeVUsCount)
for i := uint64(0); i < currActiveVUs; i++ {
vu := <-activeVUs
car.executionState.ReturnVU(vu.(lib.InitializedVU), false)
// activeVUsCount is modified only in the loop below, which is done here
for i := uint64(0); i < activeVUsCount; i++ {
<-activeVUs
}
}()

Expand Down Expand Up @@ -279,13 +280,9 @@ func (car ConstantArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC
car.progress.Modify(pb.WithProgress(progresFn))
go trackProgress(ctx, maxDurationCtx, regDurationCtx, &car, progresFn)

regDurationDone := regDurationCtx.Done()
runIterationBasic := getIterationRunner(car.executionState, car.logger)
runIteration := func(vu lib.ActiveVU) {
ctx, cancel := context.WithCancel(maxDurationCtx)
defer cancel()

runIterationBasic(ctx, vu)
runIterationBasic(maxDurationCtx, vu)
activeVUs <- vu
}

Expand All @@ -302,7 +299,7 @@ func (car ConstantArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC
)).Duration)

for li, gi := 0, start; ; li, gi = li+1, gi+offsets[li%len(offsets)] {
var t = notScaledTickerPeriod*time.Duration(gi) - time.Since(startTime)
t := notScaledTickerPeriod*time.Duration(gi) - time.Since(startTime)
timer.Reset(t)
select {
case <-timer.C:
Expand All @@ -312,7 +309,7 @@ func (car ConstantArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC
go runIteration(vu)
default:
if remainingUnplannedVUs == 0 {
//TODO: emit an error metric?
// TODO: emit an error metric?
car.logger.Warningf("Insufficient VUs, reached %d active VUs and cannot allocate more", maxVUs)
break
}
Expand All @@ -323,7 +320,7 @@ func (car ConstantArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC
remainingUnplannedVUs--
go runIteration(activateVU(initVU))
}
case <-regDurationDone:
case <-regDurationCtx.Done():
return nil
}
}
Expand Down

0 comments on commit eb13303

Please sign in to comment.