Skip to content

Commit

Permalink
ramping-arrival-rate: Avoid to spawn a new goroutine for every iterat…
Browse files Browse the repository at this point in the history
…ion (#1957)

Create a dedicated goroutine to process iterations
when a new ActiveVU is created for the Ramping arrival rate.

It changes the previous behaviour where instead
a new goroutine was created for every new iteration.

The previous solution was impacting the expected rate (iterations/s)
using a number of PreAllocatedVUs around thousand.
Added a Benchmark to check the rate with a set of PreAllocatedVUs cases.

Closes #1944

Co-authored-by: Mihail Stoykov <MStoykov@users.noreply.github.com>
Co-authored-by: Ivan Mirić <ivan@imiric.com>
  • Loading branch information
3 people committed May 21, 2021
1 parent f01c2d2 commit 5fe5e5a
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 29 deletions.
2 changes: 1 addition & 1 deletion lib/executor/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func simpleRunner(vuFn func(context.Context) error) lib.Runner {
}
}

func setupExecutor(t *testing.T, config lib.ExecutorConfig, es *lib.ExecutionState, runner lib.Runner) (
func setupExecutor(t testing.TB, config lib.ExecutorConfig, es *lib.ExecutionState, runner lib.Runner) (
context.Context, context.CancelFunc, lib.Executor, *testutils.SimpleLogrusHook,
) {
ctx, cancel := context.WithCancel(context.Background())
Expand Down
90 changes: 68 additions & 22 deletions lib/executor/ramping_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,25 +333,32 @@ func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
returnedVUs := make(chan struct{})
startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, duration, gracefulStop)

vusPool := newActiveVUPool()

defer func() {
vusPool.Close()
// Make sure all VUs aren't executing iterations anymore, for the cancel()
// below to deactivate them.
<-returnedVUs
cancel()
activeVUsWg.Wait()
}()
activeVUs := make(chan lib.ActiveVU, maxVUs)

activeVUsCount := uint64(0)

returnVU := func(u lib.InitializedVU) {
varr.executionState.ReturnVU(u, true)
activeVUsWg.Done()
}

runIterationBasic := getIterationRunner(varr.executionState, varr.logger)
activateVU := func(initVU lib.InitializedVU) lib.ActiveVU {
activeVUsWg.Add(1)
activeVU := initVU.Activate(getVUActivationParams(maxDurationCtx, varr.config.BaseConfig, returnVU))
varr.executionState.ModCurrentlyActiveVUsCount(+1)
atomic.AddUint64(&activeVUsCount, 1)

vusPool.AddVU(maxDurationCtx, activeVU, runIterationBasic)
return activeVU
}

Expand All @@ -360,13 +367,7 @@ func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
defer close(makeUnplannedVUCh)
go func() {
defer close(returnedVUs)
defer func() {
// this is done here as to not have an unplannedVU in the middle of initialization when
// starting to return activeVUs
for i := uint64(0); i < atomic.LoadUint64(&activeVUsCount); i++ {
<-activeVUs
}
}()

for range makeUnplannedVUCh {
varr.logger.Debug("Starting initialization of an unplanned VU...")
initVU, err := varr.executionState.GetUnplannedVU(maxDurationCtx, varr.logger)
Expand All @@ -375,7 +376,7 @@ func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
varr.logger.WithError(err).Error("Error while allocating unplanned VU")
} else {
varr.logger.Debug("The unplanned VU finished initializing successfully!")
activeVUs <- activateVU(initVU)
activateVU(initVU)
}
}
}()
Expand All @@ -386,7 +387,7 @@ func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
if err != nil {
return err
}
activeVUs <- activateVU(initVU)
activateVU(initVU)
}

tickerPeriod := int64(startTickerPeriod.Duration)
Expand All @@ -397,9 +398,8 @@ func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
progressFn := func() (float64, []string) {
currActiveVUs := atomic.LoadUint64(&activeVUsCount)
currentTickerPeriod := atomic.LoadInt64(&tickerPeriod)
vusInBuffer := uint64(len(activeVUs))
progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs",
currActiveVUs-vusInBuffer, currActiveVUs)
vusPool.Running(), currActiveVUs)

itersPerSec := 0.0
if currentTickerPeriod > 0 {
Expand All @@ -425,12 +425,6 @@ func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, varr, progressFn)

regDurationDone := regDurationCtx.Done()
runIterationBasic := getIterationRunner(varr.executionState, varr.logger)
runIteration := func(vu lib.ActiveVU) {
runIterationBasic(maxDurationCtx, vu)
activeVUs <- vu
}

timer := time.NewTimer(time.Hour)
start := time.Now()
ch := make(chan time.Duration, 10) // buffer 10 iteration times ahead
Expand All @@ -456,12 +450,10 @@ func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
}
}

select {
case vu := <-activeVUs: // ideally, we get the VU from the buffer without any issues
go runIteration(vu) //TODO: refactor so we dont spin up a goroutine for each iteration
if vusPool.TryRunIteration() {
continue
default: // no free VUs currently available
}

// Since there aren't any free VUs available, consider this iteration
// dropped - we aren't going to try to recover it, but

Expand All @@ -488,3 +480,57 @@ func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
}
return nil
}

// activeVUPool controls the activeVUs
// executing the received requests for iterations.
type activeVUPool struct {
iterations chan struct{}
running uint64
wg sync.WaitGroup
}

// newActiveVUPool returns an activeVUPool.
func newActiveVUPool() *activeVUPool {
return &activeVUPool{
iterations: make(chan struct{}),
}
}

// TryRunIteration invokes a request to execute a new iteration.
// When there are no available VUs to process the request
// then false is returned.
func (p *activeVUPool) TryRunIteration() bool {
select {
case p.iterations <- struct{}{}:
return true
default:
return false
}
}

// Running returns the number of the currently running VUs.
func (p *activeVUPool) Running() uint64 {
return atomic.LoadUint64(&p.running)
}

// AddVU adds the active VU to the pool of VUs for handling the incoming requests.
// When a new request is accepted the runfn function is executed.
func (p *activeVUPool) AddVU(ctx context.Context, avu lib.ActiveVU, runfn func(context.Context, lib.ActiveVU) bool) {
p.wg.Add(1)
go func() {
defer p.wg.Done()

for range p.iterations {
atomic.AddUint64(&p.running, uint64(1))
runfn(ctx, avu)
atomic.AddUint64(&p.running, ^uint64(0))
}
}()
}

// Close stops the pool from accepting requests
// then it will wait for all on-going iterations to complete.
func (p *activeVUPool) Close() {
close(p.iterations)
p.wg.Wait()
}
115 changes: 109 additions & 6 deletions lib/executor/ramping_arrival_rate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ func TestRampingArrivalRateRunUnplannedVUs(t *testing.T) {
require.NoError(t, err)
es := lib.NewExecutionState(lib.Options{}, et, 1, 3)
var count int64
var ch = make(chan struct{}) // closed when new unplannedVU is started and signal to get to next iterations
var ch2 = make(chan struct{}) // closed when a second iteration was started on an old VU in order to test it won't start a second unplanned VU in parallel or at all
ch := make(chan struct{}) // closed when new unplannedVU is started and signal to get to next iterations
ch2 := make(chan struct{}) // closed when a second iteration was started on an old VU in order to test it won't start a second unplanned VU in parallel or at all
runner := simpleRunner(func(ctx context.Context) error {
cur := atomic.AddInt64(&count, 1)
if cur == 1 {
Expand All @@ -148,7 +148,7 @@ func TestRampingArrivalRateRunUnplannedVUs(t *testing.T) {

return nil
})
var ctx, cancel, executor, logHook = setupExecutor(
ctx, cancel, executor, logHook := setupExecutor(
t, &RampingArrivalRateConfig{
TimeUnit: types.NullDurationFrom(time.Second),
Stages: []Stage{
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestRampingArrivalRateRunCorrectRateWithSlowRate(t *testing.T) {
require.NoError(t, err)
es := lib.NewExecutionState(lib.Options{}, et, 1, 3)
var count int64
var ch = make(chan struct{}) // closed when new unplannedVU is started and signal to get to next iterations
ch := make(chan struct{}) // closed when new unplannedVU is started and signal to get to next iterations
runner := simpleRunner(func(ctx context.Context) error {
cur := atomic.AddInt64(&count, 1)
if cur == 1 {
Expand All @@ -209,7 +209,7 @@ func TestRampingArrivalRateRunCorrectRateWithSlowRate(t *testing.T) {

return nil
})
var ctx, cancel, executor, logHook = setupExecutor(
ctx, cancel, executor, logHook := setupExecutor(
t, &RampingArrivalRateConfig{
TimeUnit: types.NullDurationFrom(time.Second),
Stages: []Stage{
Expand All @@ -223,7 +223,7 @@ func TestRampingArrivalRateRunCorrectRateWithSlowRate(t *testing.T) {
},
es, runner)
defer cancel()
var engineOut = make(chan stats.SampleContainer, 1000)
engineOut := make(chan stats.SampleContainer, 1000)
es.SetInitVUFunc(func(_ context.Context, logger *logrus.Entry) (lib.InitializedVU, error) {
t.Log("init")
cur := atomic.LoadInt64(&count)
Expand All @@ -243,6 +243,109 @@ func TestRampingArrivalRateRunCorrectRateWithSlowRate(t *testing.T) {
assert.Equal(t, int64(2), es.GetInitializedVUsCount())
}

func TestRampingArrivalRateRunGracefulStop(t *testing.T) {
t.Parallel()
et, err := lib.NewExecutionTuple(nil, nil)
require.NoError(t, err)
es := lib.NewExecutionState(lib.Options{}, et, 10, 10)

runner := simpleRunner(func(ctx context.Context) error {
time.Sleep(5 * time.Second)
return nil
})
ctx, cancel, executor, _ := setupExecutor(
t, &RampingArrivalRateConfig{
TimeUnit: types.NullDurationFrom(1 * time.Second),
Stages: []Stage{
{
Duration: types.NullDurationFrom(2 * time.Second),
Target: null.IntFrom(10),
},
},
StartRate: null.IntFrom(10),
PreAllocatedVUs: null.IntFrom(10),
MaxVUs: null.IntFrom(10),
BaseConfig: BaseConfig{
GracefulStop: types.NullDurationFrom(5 * time.Second),
},
},
es, runner)
defer cancel()

engineOut := make(chan stats.SampleContainer, 1000)
defer close(engineOut)

err = executor.Run(ctx, engineOut)
assert.NoError(t, err)
assert.Equal(t, int64(0), es.GetCurrentlyActiveVUsCount())
assert.Equal(t, int64(10), es.GetInitializedVUsCount())
assert.Equal(t, uint64(10), es.GetFullIterationCount())
}

func BenchmarkRampingArrivalRateRun(b *testing.B) {
tests := []struct {
prealloc null.Int
}{
{prealloc: null.IntFrom(10)},
{prealloc: null.IntFrom(100)},
{prealloc: null.IntFrom(1e3)},
{prealloc: null.IntFrom(10e3)},
}

for _, tc := range tests {
b.Run(fmt.Sprintf("VUs%d", tc.prealloc.ValueOrZero()), func(b *testing.B) {
engineOut := make(chan stats.SampleContainer, 1000)
defer close(engineOut)
go func() {
for range engineOut {
// discard
}
}()

es := lib.NewExecutionState(lib.Options{}, mustNewExecutionTuple(nil, nil), uint64(tc.prealloc.Int64), uint64(tc.prealloc.Int64))

var count int64
runner := simpleRunner(func(ctx context.Context) error {
atomic.AddInt64(&count, 1)
return nil
})

// an high target to get the highest rate
target := int64(1e9)

ctx, cancel, executor, _ := setupExecutor(
b, &RampingArrivalRateConfig{
TimeUnit: types.NullDurationFrom(1 * time.Second),
Stages: []Stage{
{
Duration: types.NullDurationFrom(0),
Target: null.IntFrom(target),
},
{
Duration: types.NullDurationFrom(5 * time.Second),
Target: null.IntFrom(target),
},
},
PreAllocatedVUs: tc.prealloc,
MaxVUs: tc.prealloc,
},
es, runner)
defer cancel()

b.ResetTimer()
start := time.Now()

err := executor.Run(ctx, engineOut)
took := time.Since(start)
assert.NoError(b, err)

iterations := float64(atomic.LoadInt64(&count))
b.ReportMetric(0, "ns/op")
b.ReportMetric(iterations/took.Seconds(), "iterations/s")
})
}
}

func mustNewExecutionTuple(seg *lib.ExecutionSegment, seq *lib.ExecutionSegmentSequence) *lib.ExecutionTuple {
et, err := lib.NewExecutionTuple(seg, seq)
if err != nil {
Expand Down

0 comments on commit 5fe5e5a

Please sign in to comment.