diff --git a/internal/helloworld/greeter_server.go b/internal/helloworld/greeter_server.go index 7b09323a..73ec43fb 100644 --- a/internal/helloworld/greeter_server.go +++ b/internal/helloworld/greeter_server.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "math/rand" + "strings" "sync" "time" @@ -220,6 +221,37 @@ func (c *HWStatsHandler) GetConnectionCount() int { return val } +// GetCountByWorker gets count of requests by goroutine +func (s *Greeter) GetCountByWorker(key CallType) map[string]int { + s.mutex.Lock() + val, ok := s.calls[key] + s.mutex.Unlock() + + if !ok { + return nil + } + + counts := make(map[string]int) + + for _, reqs := range val { + for _, req := range reqs { + name := req.GetName() + if strings.Contains(name, "worker:") { + parts := strings.Split(name, ":") + wid := parts[len(parts)-1] + wc, ok := counts[wid] + if !ok { + counts[wid] = 0 + } + + counts[wid] = wc + 1 + } + } + } + + return counts +} + // HandleConn handle the connection func (c *HWStatsHandler) HandleConn(ctx context.Context, cs stats.ConnStats) { // no-op diff --git a/load/pacer.go b/load/pacer.go new file mode 100644 index 00000000..573afb51 --- /dev/null +++ b/load/pacer.go @@ -0,0 +1,322 @@ +package load + +import ( + "fmt" + "math" + "sync" + "time" +) + +// nano is the const for number of nanoseconds in a second +const nano = 1e9 + +// A Pacer defines the control interface to control the rate of hit. +type Pacer interface { + // Pace returns the duration the attacker should wait until + // making next hit, given an already elapsed duration and + // completed hits. If the second return value is true, an attacker + // should stop sending hits. + Pace(elapsed time.Duration, hits uint64) (wait time.Duration, stop bool) + + // Rate returns a Pacer's instantaneous hit rate (per seconds) + // at the given elapsed duration of an attack. + Rate(elapsed time.Duration) float64 +} + +// A PacerFunc is a function adapter type that implements +// the Pacer interface. +// type PacerFunc func(time.Duration, uint64) (time.Duration, bool) + +// A ConstantPacer defines a constant rate of hits for the target. +type ConstantPacer struct { + Freq uint64 // Frequency of hits per second + Max uint64 // Optional maximum allowed hits +} + +// String returns a pretty-printed description of the ConstantPacer's behaviour: +// ConstantPacer{Freq: 1} => Constant{1 hits/1s} +func (cp *ConstantPacer) String() string { + return fmt.Sprintf("Constant{%d hits/%f}", cp.Freq, nano) +} + +// Pace determines the length of time to sleep until the next hit is sent. +func (cp *ConstantPacer) Pace(elapsed time.Duration, hits uint64) (time.Duration, bool) { + + if hits >= cp.Max { + return 0, true + } + + if cp.Freq == 0 { + return 0, false // Zero value = infinite rate + } + + expectedHits := uint64(cp.Freq) * uint64(elapsed/nano) + if hits < expectedHits { + // Running behind, send next hit immediately. + return 0, false + } + + interval := uint64(nano / int64(cp.Freq)) + if math.MaxInt64/interval < hits { + // We would overflow delta if we continued, so stop the attack. + return 0, true + } + + delta := time.Duration((hits + 1) * interval) + // Zero or negative durations cause time.Sleep to return immediately. + return delta - elapsed, false +} + +// Rate returns a ConstantPacer's instantaneous hit rate (i.e. requests per second) +// at the given elapsed duration of an attack. Since it's constant, the return +// value is independent of the given elapsed duration. +func (cp *ConstantPacer) Rate(elapsed time.Duration) float64 { + return cp.hitsPerNs() * 1e9 +} + +// hitsPerNs returns the attack rate this ConstantPacer represents, in +// fractional hits per nanosecond. +func (cp *ConstantPacer) hitsPerNs() float64 { + return float64(cp.Freq) / nano +} + +// StepPacer paces an attack by starting at a given request rate +// and increasing with steps at a given time interval and duration. +type StepPacer struct { + Start ConstantPacer + Step int64 + StepDuration time.Duration + Stop ConstantPacer + LoadDuration time.Duration + Max uint64 + + once sync.Once + init bool // TOOO improve this + constAt time.Duration + baseHits uint64 +} + +func (p *StepPacer) initialize() { + + if p.StepDuration == 0 { + panic("StepPacer.StepDuration cannot be 0") + } + + if p.Step == 0 { + panic("StepPacer.Step cannot be 0") + } + + if p.Start.Freq == 0 { + panic("Start.Freq cannot be 0") + } + + if p.init { + return + } + + p.init = true + + if p.LoadDuration > 0 { + p.constAt = p.LoadDuration + + if p.Stop.Freq == 0 { + steps := p.constAt.Nanoseconds() / p.StepDuration.Nanoseconds() + + p.Stop.Freq = p.Start.Freq + uint64(int64(p.Step)*steps) + } + } else if p.Stop.Freq > 0 && p.constAt == 0 { + stopRPS := float64(p.Stop.Freq) + + if p.Step > 0 { + t := time.Duration(0) + for { + if p.Rate(t) > stopRPS { + p.constAt = t + break + } + t = t + p.StepDuration + } + } else { + t := time.Duration(0) + for { + if p.Rate(t) < stopRPS { + p.constAt = t + break + } + t = t + p.StepDuration + } + } + } + + if p.constAt > 0 { + p.baseHits = uint64(p.hits(p.constAt)) + } +} + +// Pace determines the length of time to sleep until the next hit is sent. +func (p *StepPacer) Pace(elapsed time.Duration, hits uint64) (time.Duration, bool) { + + if hits >= p.Max { + return 0, true + } + + p.once.Do(p.initialize) + + expectedHits := p.hits(elapsed) + + if hits < uint64(expectedHits) { + // Running behind, send next hit immediately. + return 0, false + } + + // const part + if p.constAt > 0 && elapsed >= p.constAt { + if p.Stop.Freq == 0 { + return 0, true + } + + return p.Stop.Pace(elapsed-p.constAt, hits-p.baseHits) + } + + rate := p.Rate(elapsed) + interval := nano / rate + + if n := uint64(interval); n != 0 && math.MaxInt64/n < hits { + // We would overflow wait if we continued, so stop the attack. + return 0, true + } + + delta := float64(hits+1) - expectedHits + wait := time.Duration(interval * delta) + + // if wait > nano { + // intervals := elapsed / nano + // wait = (intervals+1)*nano - elapsed + // } + + return wait, false +} + +// Rate returns a StepPacer's instantaneous hit rate (i.e. requests per second) +// at the given elapsed duration of an attack. +func (p *StepPacer) Rate(elapsed time.Duration) float64 { + p.initialize() + + t := elapsed + + if p.constAt > 0 && elapsed >= p.constAt { + return float64(p.Stop.Freq) + } + + steps := t.Nanoseconds() / p.StepDuration.Nanoseconds() + + rate := (p.Start.hitsPerNs() + float64(int64(p.Step)*steps)/nano) * 1e9 + + if rate < 0 { + rate = 0 + } + + return rate +} + +// hits returns the number of hits that have been sent during an attack +// lasting t nanoseconds. It returns a float so we can tell exactly how +// much we've missed our target by when solving numerically in Pace. +func (p *StepPacer) hits(t time.Duration) float64 { + if t < 0 { + return 0 + } + + steps := t.Nanoseconds() / p.StepDuration.Nanoseconds() + + base := p.Start.hitsPerNs() * 1e9 + + // first step + var s float64 + if steps > 0 { + s = p.StepDuration.Seconds() * base + } else { + s = t.Seconds() * base + } + + // previous steps: 1...n + for i := int64(1); i < steps; i++ { + d := time.Duration(p.StepDuration.Nanoseconds() * i) + r := p.Rate(d) + ch := r * p.StepDuration.Seconds() + s = s + ch + } + + c := float64(0) + if steps > 0 { + // current step + elapsed := time.Duration(t.Nanoseconds() - steps*p.StepDuration.Nanoseconds()) + c = elapsed.Seconds() * p.Rate(t) + } + + return s + c +} + +// LinearPacer paces an attack by starting at a given request rate +// and increasing linearly with the given slope. +type LinearPacer struct { + Start ConstantPacer + Slope int64 + Stop ConstantPacer + LoadDuration time.Duration + Max uint64 + + once sync.Once + sp StepPacer +} + +func (p *LinearPacer) initialize() { + if p.Start.Freq == 0 { + panic("LinearPacer.Start cannot be 0") + } + + if p.Slope == 0 { + panic("LinearPacer.Slope cannot be 0") + } + + p.once.Do(func() { + p.sp = StepPacer{ + Start: p.Start, + Step: p.Slope, + StepDuration: time.Second, + Stop: p.Stop, + LoadDuration: p.LoadDuration, + } + + p.sp.initialize() + }) +} + +// Pace determines the length of time to sleep until the next hit is sent. +func (p *LinearPacer) Pace(elapsed time.Duration, hits uint64) (time.Duration, bool) { + if hits >= p.Max { + return 0, true + } + + p.initialize() + + return p.sp.Pace(elapsed, hits) +} + +// Rate returns a LinearPacer's instantaneous hit rate (i.e. requests per second) +// at the given elapsed duration of an attack. +func (p *LinearPacer) Rate(elapsed time.Duration) float64 { + + p.initialize() + + return p.sp.Rate(elapsed) +} + +// hits returns the number of hits that have been sent during an attack +// lasting t nanoseconds. It returns a float so we can tell exactly how +// much we've missed our target by when solving numerically in Pace. +func (p *LinearPacer) hits(t time.Duration) float64 { + p.initialize() + + return p.sp.hits(t) +} diff --git a/load/pacer_test.go b/load/pacer_test.go new file mode 100644 index 00000000..90376212 --- /dev/null +++ b/load/pacer_test.go @@ -0,0 +1,1049 @@ +package load + +import ( + "math" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestConstantPacer(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + freq uint64 + elapsed time.Duration + hits uint64 + wait time.Duration + stop bool + }{ + // 1 hit/sec, 0 hits sent, 0s elapsed => 1s until next hit + { + freq: 1, + elapsed: 0, + hits: 0, + wait: 1000000000, + stop: false, + }, + // 1 hit/sec, 0 hits sent, 0.1s elapsed => 0.9s until next hit + { + freq: 1, + elapsed: 100 * time.Millisecond, + hits: 0, + wait: 900 * time.Millisecond, + stop: false, + }, + // 1 hit/sec, 0 hits sent, 1s elapsed => 0s until next hit + { + freq: 1, + elapsed: 1 * time.Second, + hits: 0, + wait: 0, + stop: false, + }, + // 1 hit/sec, 0 hits sent, 2s elapsed => 0s (-1s) until next hit + { + freq: 1, + elapsed: 2 * time.Second, + hits: 0, + wait: 0, + stop: false, + }, + // 1 hit/sec, 1 hit sent, 1s elapsed => 1s until next hit + { + freq: 1, + elapsed: 1 * time.Second, + hits: 1, + wait: 1 * time.Second, + stop: false, + }, + // 1 hit/sec, 2 hits sent, 1s elapsed => 2s until next hit + { + freq: 1, + elapsed: 1 * time.Second, + hits: 2, + wait: 2 * time.Second, + stop: false, + }, + // 1 hit/sec, 10 hits sent, 1s elapsed => 10s until next hit + { + freq: 1, + elapsed: 1 * time.Second, + hits: 10, + wait: 10 * time.Second, + stop: false, + }, + // 1 hit/sec, 10 hits sent, 11s elapsed => 0s until next hit + { + freq: 1, + elapsed: 11 * time.Second, + hits: 10, + wait: 0, + stop: false, + }, + // 2 hit/sec, 9 hits sent, 4.9s elapsed => 100ms until next hit + { + freq: 2, + elapsed: 4900 * time.Millisecond, + hits: 9, + wait: 100 * time.Millisecond, + stop: false, + }, + // BAD TESTS + // Zero frequency. + { + freq: 0, + elapsed: 0, + hits: 0, + wait: 0, + stop: false, + }, + // Large hits, overflow int64. + { + freq: 1, + elapsed: time.Duration(math.MaxInt64), + hits: 2562048, + wait: 0, + stop: false, + }, + } { + cp := ConstantPacer{Freq: tc.freq} + wait, stop := cp.Pace(tc.elapsed, tc.hits) + assert.Equal(t, tc.wait, wait) + assert.Equal(t, tc.stop, stop) + } +} + +func TestConstantPacer_Rate(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + freq uint64 + elapsed time.Duration + rate float64 + }{ + { + freq: 60, + elapsed: 0, + rate: 60, + }, + { + freq: 500, + elapsed: 5 * time.Second, + rate: 500.0, + }, + } { + cp := ConstantPacer{Freq: tc.freq} + actual, expected := cp.Rate(tc.elapsed), tc.rate + assert.True(t, floatEqual(actual, expected), "%s.Rate(_): actual %f, expected %f", cp, actual, expected) + } +} + +// Stolen from https://github.com/google/go-cmp/cmp/cmpopts/equate.go +// to avoid an unwieldy dependency. Both fraction and margin set at 1e-6. +func floatEqual(x, y float64) bool { + relMarg := 1e-6 * math.Min(math.Abs(x), math.Abs(y)) + return math.Abs(x-y) <= math.Max(1e-6, relMarg) +} + +// A similar function to the above because SinePacer.Pace has discrete +// inputs and outputs but uses floats internally, and sometimes the +// floating point imprecision leaks out :-( +func durationEqual(x, y time.Duration) bool { + diff := x - y + if diff < 0 { + diff = -diff + } + return diff <= time.Microsecond +} + +func TestLinearPacer(t *testing.T) { + + t.Parallel() + + for ti, tt := range []struct { + // pacer config + start uint64 + slope int64 + stopDuration time.Duration + stopRate uint64 + // params + elapsed time.Duration + hits uint64 + // expected + wait time.Duration + stop bool + }{ + // slope: 1, start 1 + { + start: 1, + slope: 1, + elapsed: 0, + hits: 0, + wait: 1 * time.Second, + stop: false, + }, + { + start: 1, + slope: 1, + elapsed: 1 * time.Second, + hits: 0, + wait: 0 * time.Millisecond, + stop: false, + }, + { + start: 1, + slope: 1, + elapsed: 0, + hits: 1, + wait: 2 * time.Second, + stop: false, + }, + { + start: 1, + slope: 1, + elapsed: 1 * time.Second, + hits: 1, + wait: 500 * time.Millisecond, + stop: false, + }, + { + start: 1, + slope: 1, + elapsed: 1 * time.Second, + hits: 2, + wait: 1000 * time.Millisecond, + stop: false, + }, + { + start: 1, + slope: 1, + elapsed: 2 * time.Second, + hits: 2, + wait: 0 * time.Millisecond, + stop: false, + }, + { + start: 1, + slope: 1, + elapsed: 2500 * time.Millisecond, + hits: 5, + wait: 500 * time.Millisecond, + stop: false, + }, + // slope: 1, start 5 + { + start: 5, + slope: 1, + elapsed: 0, + hits: 0, + wait: 200 * time.Millisecond, + stop: false, + }, + { + start: 5, + slope: 1, + elapsed: 1 * time.Second, + hits: 5, + wait: 166666 * time.Microsecond, + stop: false, + }, + { + start: 5, + slope: 1, + elapsed: 1200 * time.Millisecond, + hits: 5, + wait: 0 * time.Microsecond, + stop: false, + }, + { + start: 5, + slope: 1, + elapsed: 2000 * time.Millisecond, + hits: 6, + wait: 0, + stop: false, + }, + { + start: 5, + slope: 1, + elapsed: 2000 * time.Millisecond, + hits: 7, + wait: 0, + stop: false, + }, + { + start: 5, + slope: 1, + elapsed: 2000 * time.Millisecond, + hits: 11, + wait: 142857 * time.Microsecond, + stop: false, + }, + { + start: 5, + slope: 1, + elapsed: 2000 * time.Millisecond, + hits: 12, + wait: 285714 * time.Microsecond, + stop: false, + }, + // // slope: -1, start 20, various elapsed and hits + { + start: 20, + slope: -1, + elapsed: 0, + hits: 0, + wait: 50 * time.Millisecond, + stop: false, + }, + { + start: 20, + slope: -1, + elapsed: 1100 * time.Millisecond, + hits: 0, + wait: 0, + stop: false, + }, + { + start: 20, + slope: -1, + elapsed: 50 * time.Millisecond, + hits: 1, + wait: 50 * time.Millisecond, + stop: false, + }, + { + start: 20, + slope: -1, + elapsed: 50 * time.Millisecond, + hits: 19, + wait: 950 * time.Millisecond, + stop: false, + }, + { + start: 20, + slope: -1, + elapsed: 950 * time.Millisecond, + hits: 19, + wait: 50 * time.Millisecond, + stop: false, + }, + // slope: 1, stop rate + { + start: 1, + slope: 1, + elapsed: 0, + stopRate: 20, + hits: 0, + wait: 1 * time.Second, + stop: false, + }, + { + start: 1, + slope: 1, + stopRate: 5, + elapsed: 5 * time.Second, + hits: 0, + wait: 0, + stop: false, + }, + { + start: 1, + slope: 1, + stopRate: 5, + elapsed: 5000 * time.Millisecond, + hits: 17, + wait: 600 * time.Millisecond, + stop: false, + }, + // slope: 1, stop duration + { + start: 1, + slope: 1, + elapsed: 0, + stopDuration: 5 * time.Second, + hits: 0, + wait: 1 * time.Second, + stop: false, + }, + { + start: 1, + slope: 1, + stopDuration: 5 * time.Second, + elapsed: 2 * time.Second, + hits: 0, + wait: 0, + stop: false, + }, + { + start: 1, + slope: 1, + stopDuration: 5 * time.Second, + elapsed: 2000 * time.Millisecond, + hits: 5, + wait: 1 * time.Second, + stop: false, + }, + { + start: 1, + slope: 1, + stopDuration: 5 * time.Second, + elapsed: 5200 * time.Millisecond, + hits: 18, + wait: 466666 * time.Microsecond, + stop: false, + }, + } { + t.Run(strconv.Itoa(ti), func(t *testing.T) { + p := LinearPacer{ + Start: ConstantPacer{Freq: tt.start}, + Slope: tt.slope, + Stop: ConstantPacer{Freq: tt.stopRate}, + LoadDuration: tt.stopDuration, + } + + wait, stop := p.Pace(tt.elapsed, tt.hits) + + assert.True(t, durationEqual(tt.wait, wait), + "%d: %+v.Pace(%s, %d) = (%s, %t); expected (%s, %t)", ti, &p, tt.elapsed, tt.hits, wait, stop, tt.wait, tt.stop) + + assert.Equal(t, tt.stop, stop) + }) + } +} + +func TestLinearPacer_hits(t *testing.T) { + t.Skip() + + // TODO improve this to have different pacer params + p := LinearPacer{ + Start: ConstantPacer{Freq: 100}, + Slope: 10, + } + + for _, tc := range []struct { + elapsed time.Duration + hits float64 + }{ + {0, 0}, + {1 * time.Second, 105}, + {2 * time.Second, 220}, + {4 * time.Second, 480}, + {8 * time.Second, 1120}, + {16 * time.Second, 2880}, + {32 * time.Second, 8320}, + {64 * time.Second, 26880}, + {128 * time.Second, 94720}, + } { + actual := p.hits(tc.elapsed) + expected := tc.hits + + assert.True(t, floatEqual(actual, expected), "%+v.hits(%v) = %v, expected: %v", p, tc.elapsed, actual, expected) + } +} + +func TestStepPacer_hits(t *testing.T) { + t.Parallel() + + // TODO improve this to have different pacer params + p := StepPacer{ + Start: ConstantPacer{Freq: 10}, + StepDuration: 4 * time.Second, + Step: 10, + } + + for _, tc := range []struct { + elapsed time.Duration + hits float64 + }{ + {0, 0}, + {1 * time.Second, 10}, + {2 * time.Second, 20}, + {6 * time.Second, 80}, + } { + actual := p.hits(tc.elapsed) + expected := tc.hits + + assert.True(t, floatEqual(actual, expected), "%+v.hits(%v) = %v, expected: %v", p, tc.elapsed, actual, expected) + } +} + +func TestStepPacer_Rate(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + // pacer config + start uint64 + step int64 + stepDuration time.Duration + stop uint64 + stopDuration time.Duration + // params + elapsed time.Duration + // expected + rate float64 + }{ + // step: 5, start: 1 + { + start: 1, + step: 5, + stepDuration: 4 * time.Second, + stop: 0, + stopDuration: 0, + elapsed: 0, + rate: 1, + }, + { + start: 1, + step: 5, + stepDuration: 4 * time.Second, + stop: 0, + stopDuration: 0, + elapsed: 1 * time.Second, + rate: 1, + }, + { + start: 1, + step: 5, + stepDuration: 4 * time.Second, + stop: 0, + stopDuration: 0, + elapsed: 3 * time.Second, + rate: 1, + }, + { + start: 1, + step: 5, + stepDuration: 4 * time.Second, + stop: 0, + stopDuration: 0, + elapsed: 4 * time.Second, + rate: 6, + }, + { + start: 1, + step: 5, + stepDuration: 4 * time.Second, + stop: 0, + stopDuration: 0, + elapsed: 5 * time.Second, + rate: 6, + }, + { + start: 5, + step: 5, + stepDuration: 4 * time.Second, + stop: 25, + stopDuration: 0, + elapsed: 9 * time.Second, + rate: 15, + }, + { + start: 5, + step: 5, + stepDuration: 4 * time.Second, + stop: 25, + stopDuration: 0, + elapsed: 12 * time.Second, + rate: 20, + }, + { + start: 5, + step: 5, + stepDuration: 4 * time.Second, + stop: 25, + stopDuration: 0, + elapsed: 22 * time.Second, + rate: 25, + }, + // start: 5, step: 5, stop duration: 25s + { + start: 5, + step: 5, + stepDuration: 4 * time.Second, + stop: 0, + stopDuration: 25 * time.Second, + elapsed: 0, + rate: 5, + }, + { + start: 5, + step: 5, + stepDuration: 4 * time.Second, + stop: 0, + stopDuration: 25 * time.Second, + elapsed: 19 * time.Second, + rate: 25, + }, + { + start: 5, + step: 5, + stepDuration: 4 * time.Second, + stop: 0, + stopDuration: 25 * time.Second, + elapsed: 20 * time.Second, + rate: 30, + }, + { + start: 5, + step: 5, + stepDuration: 4 * time.Second, + stop: 0, + stopDuration: 25 * time.Second, + elapsed: 21 * time.Second, + rate: 30, + }, + { + start: 5, + step: 5, + stepDuration: 4 * time.Second, + stop: 0, + stopDuration: 25 * time.Second, + elapsed: 26 * time.Second, + rate: 35, + }, + { + start: 5, + step: 5, + stepDuration: 4 * time.Second, + stop: 0, + stopDuration: 25 * time.Second, + elapsed: 31 * time.Second, + rate: 35, + }, + // start: 15, step -5 + { + start: 15, + step: -5, + stepDuration: 4 * time.Second, + stop: 0, + stopDuration: 0, + elapsed: 0, + rate: 15, + }, + { + start: 15, + step: -5, + stepDuration: 4 * time.Second, + stop: 0, + stopDuration: 0, + elapsed: 3 * time.Second, + rate: 15, + }, + { + start: 15, + step: -5, + stepDuration: 4 * time.Second, + stop: 0, + stopDuration: 0, + elapsed: 4 * time.Second, + rate: 10, + }, + { + start: 15, + step: -5, + stepDuration: 4 * time.Second, + stop: 0, + stopDuration: 0, + elapsed: 5 * time.Second, + rate: 10, + }, + { + start: 15, + step: -5, + stepDuration: 4 * time.Second, + stop: 0, + stopDuration: 0, + elapsed: 11 * time.Second, + rate: 5, + }, + { + start: 15, + step: -5, + stepDuration: 4 * time.Second, + stop: 0, + stopDuration: 0, + elapsed: 12 * time.Second, + rate: 0, + }, + { + start: 15, + step: -5, + stepDuration: 4 * time.Second, + stop: 0, + stopDuration: 0, + elapsed: 17 * time.Second, + rate: 0, + }, + // start: 20, step: -5, stop: 5 + { + start: 20, + step: -5, + stepDuration: 4 * time.Second, + stop: 5, + stopDuration: 0, + elapsed: 0, + rate: 20, + }, + { + start: 20, + step: -5, + stepDuration: 4 * time.Second, + stop: 5, + stopDuration: 0, + elapsed: 11 * time.Second, + rate: 10, + }, + { + start: 20, + step: -5, + stepDuration: 4 * time.Second, + stop: 5, + stopDuration: 0, + elapsed: 12 * time.Second, + rate: 5, + }, + { + start: 20, + step: -5, + stepDuration: 4 * time.Second, + stop: 5, + stopDuration: 0, + elapsed: 13 * time.Second, + rate: 5, + }, + { + start: 20, + step: -5, + stepDuration: 4 * time.Second, + stop: 5, + stopDuration: 0, + elapsed: 22 * time.Second, + rate: 5, + }, + // start: 20, step: -5, stop: 10s + { + start: 20, + step: -5, + stepDuration: 4 * time.Second, + stop: 0, + stopDuration: 10 * time.Second, + elapsed: 0, + rate: 20, + }, + { + start: 20, + step: -5, + stepDuration: 4 * time.Second, + stop: 0, + stopDuration: 10 * time.Second, + elapsed: 9 * time.Second, + rate: 10, + }, + { + start: 20, + step: -5, + stepDuration: 4 * time.Second, + stop: 0, + stopDuration: 10 * time.Second, + elapsed: 10 * time.Second, + rate: 10, + }, + { + start: 20, + step: -5, + stepDuration: 4 * time.Second, + stop: 0, + stopDuration: 10 * time.Second, + elapsed: 11 * time.Second, + rate: 10, + }, + { + start: 20, + step: -5, + stepDuration: 4 * time.Second, + stop: 0, + stopDuration: 10 * time.Second, + elapsed: 15 * time.Second, + rate: 10, + }, + { + start: 20, + step: -5, + stepDuration: 4 * time.Second, + stop: 0, + stopDuration: 10 * time.Second, + elapsed: 22 * time.Second, + rate: 10, + }, + } { + p := StepPacer{ + Start: ConstantPacer{Freq: tc.start}, + Step: tc.step, StepDuration: tc.stepDuration, + LoadDuration: tc.stopDuration, Stop: ConstantPacer{Freq: tc.stop}} + + actual := p.Rate(tc.elapsed) + expected := tc.rate + + assert.True(t, floatEqual(actual, expected), "%+v.Rate(%v) = %v, expected: %v", p, tc.elapsed, actual, expected) + } +} + +func TestStepPacer(t *testing.T) { + t.Parallel() + + for ti, tc := range []struct { + // pacer config + start uint64 + step int64 + stepDuration time.Duration + stop uint64 + stopDuration time.Duration + // params + elapsed time.Duration + hits uint64 + // expected + wait time.Duration + stopResult bool + }{ + // start: 5, step: 5 + { + start: 5, + step: 5, + stepDuration: 5 * time.Second, + stop: 0, + stopDuration: 0 * time.Second, + elapsed: 0 * time.Second, + hits: 0, + wait: 200 * time.Millisecond, + stopResult: false, + }, + { + start: 5, + step: 5, + stepDuration: 5 * time.Second, + stop: 0, + stopDuration: 0 * time.Second, + elapsed: 1 * time.Second, + hits: 4, + wait: 0 * time.Millisecond, + stopResult: false, + }, + { + start: 5, + step: 5, + stepDuration: 5 * time.Second, + stop: 0, + stopDuration: 0 * time.Second, + elapsed: 1 * time.Second, + hits: 6, + wait: 400 * time.Millisecond, + stopResult: false, + }, + { + start: 5, + step: 5, + stepDuration: 5 * time.Second, + stop: 0, + stopDuration: 0 * time.Second, + elapsed: 4200 * time.Millisecond, + hits: 25, + wait: 1 * time.Second, + stopResult: false, + }, + { + start: 5, + step: 5, + stepDuration: 5 * time.Second, + stop: 0, + stopDuration: 0 * time.Second, + elapsed: 5000 * time.Millisecond, + hits: 25, + wait: 100 * time.Millisecond, + stopResult: false, + }, + // start: 5, step: 5, stop: 25 + { + start: 5, + step: 5, + stepDuration: 5 * time.Second, + stop: 25, + stopDuration: 0 * time.Second, + elapsed: 5000 * time.Millisecond, + hits: 25, + wait: 100 * time.Millisecond, + stopResult: false, + }, + { + start: 5, + step: 5, + stepDuration: 5 * time.Second, + stop: 25, + stopDuration: 0 * time.Second, + elapsed: 20 * time.Second, + hits: 250, + wait: 40 * time.Millisecond, + stopResult: false, + }, + { + start: 5, + step: 5, + stepDuration: 5 * time.Second, + stop: 25, + stopDuration: 0 * time.Second, + elapsed: 30 * time.Second, + hits: 450, + wait: 0 * time.Millisecond, + stopResult: false, + }, + { + start: 5, + step: 5, + stepDuration: 5 * time.Second, + stop: 25, + stopDuration: 0 * time.Second, + elapsed: 30 * time.Second, + hits: 500, + wait: 40 * time.Millisecond, + stopResult: false, + }, + // start: 5, step: 5, stop: 20s + { + start: 5, + step: 5, + stepDuration: 5 * time.Second, + stop: 0, + stopDuration: 20 * time.Second, + elapsed: 5000 * time.Millisecond, + hits: 25, + wait: 100 * time.Millisecond, + stopResult: false, + }, + { + start: 5, + step: 5, + stepDuration: 5 * time.Second, + stop: 0, + stopDuration: 20 * time.Second, + elapsed: 19 * time.Second, + hits: 25, + wait: 0 * time.Millisecond, + stopResult: false, + }, + { + start: 5, + step: 5, + stepDuration: 5 * time.Second, + stop: 0, + stopDuration: 20 * time.Second, + elapsed: 20 * time.Second, + hits: 250, + wait: 40 * time.Millisecond, + stopResult: false, + }, + { + start: 5, + step: 5, + stepDuration: 5 * time.Second, + stop: 0, + stopDuration: 20 * time.Second, + elapsed: 30 * time.Second, + hits: 400, + wait: 0 * time.Millisecond, + stopResult: false, + }, + { + start: 5, + step: 5, + stepDuration: 5 * time.Second, + stop: 0, + stopDuration: 20 * time.Second, + elapsed: 30 * time.Second, + hits: 500, + wait: 40 * time.Millisecond, + stopResult: false, + }, + // start: 20, step: -5, + { + start: 20, + step: -5, + stepDuration: 5 * time.Second, + stop: 0, + stopDuration: 0, + elapsed: 0 * time.Millisecond, + hits: 0, + wait: 50 * time.Millisecond, + stopResult: false, + }, + { + start: 20, + step: -5, + stepDuration: 5 * time.Second, + stop: 0, + stopDuration: 0, + elapsed: 5000 * time.Millisecond, + hits: 100, + wait: 66666666 * time.Nanosecond, + stopResult: false, + }, + { + start: 20, + step: -5, + stepDuration: 5 * time.Second, + stop: 0, + stopDuration: 0, + elapsed: 5000 * time.Millisecond, + hits: 100, + wait: 66666666 * time.Nanosecond, + stopResult: false, + }, + { + start: 20, + step: -5, + stepDuration: 5 * time.Second, + stop: 0, + stopDuration: 0, + elapsed: 20 * time.Second, + hits: 249, + wait: 0, + stopResult: false, + }, + { + start: 20, + step: -5, + stepDuration: 5 * time.Second, + stop: 0, + stopDuration: 0, + elapsed: 20 * time.Second, + hits: 250, + wait: 0, + stopResult: true, + }, + { + start: 30, + step: -5, + stepDuration: 5 * time.Second, + stop: 0, + stopDuration: 20 * time.Second, + elapsed: 30 * time.Second, + hits: 550, + wait: 100 * time.Millisecond, + stopResult: false, + }, + } { + t.Run(strconv.Itoa(ti), func(t *testing.T) { + p := StepPacer{ + Start: ConstantPacer{Freq: tc.start}, + Step: tc.step, StepDuration: tc.stepDuration, + LoadDuration: tc.stopDuration, Stop: ConstantPacer{Freq: tc.stop}} + + wait, stop := p.Pace(tc.elapsed, tc.hits) + + assert.Equal(t, tc.wait, wait, "%+v.Pace(%v, %v) = %v, expected: %v", p, tc.elapsed, tc.hits, wait, tc.wait) + assert.Equal(t, tc.stopResult, stop, "%+v.Pace(%v, %v) = %v, expected: %v", p, tc.elapsed, tc.hits, stop, tc.stopResult) + }) + } +} diff --git a/load/worker_ticker.go b/load/worker_ticker.go new file mode 100644 index 00000000..1628c949 --- /dev/null +++ b/load/worker_ticker.go @@ -0,0 +1,142 @@ +package load + +import ( + "fmt" + "time" +) + +// WorkerTicker is the interface for worker ticker which controls worker parallelism +type WorkerTicker interface { + Ticker() <-chan TickValue + Run() + Finish() +} + +// TickValue is the delta value +type TickValue struct { + Delta int +} + +// ConstWorkerTicker is the const worker +type ConstWorkerTicker struct { + C chan TickValue + N uint +} + +// Ticker returns the ticker channer +func (c *ConstWorkerTicker) Ticker() <-chan TickValue { + return c.C +} + +// Run runs the ticker +func (c *ConstWorkerTicker) Run() { + c.C <- TickValue{Delta: int(c.N)} +} + +// Finish stops +func (c *ConstWorkerTicker) Finish() { + close(c.C) +} + +// StepWorkerTicker is the worker ticker that implements step adjustments to concurrency +type StepWorkerTicker struct { + C chan TickValue + + Start uint + Step int + StepDuration time.Duration + Stop uint + LoadDuration time.Duration +} + +// Ticker returns the ticker channer +func (c *StepWorkerTicker) Ticker() <-chan TickValue { + return c.C +} + +// Run runs the ticker +func (c *StepWorkerTicker) Run() { + + stepUp := c.Step > 0 + wc := int(c.Start) + done := make(chan bool) + + ticker := time.NewTicker(c.StepDuration) + defer ticker.Stop() + + begin := time.Now() + + c.C <- TickValue{Delta: int(c.Start)} + + go func() { + fmt.Println("step ticker start") + for { + select { + case <-ticker.C: + fmt.Println("worker ticker", time.Since(begin)) + if c.LoadDuration > 0 && time.Since(begin) > c.LoadDuration { + fmt.Println("duration stop reached", wc, time.Since(begin)) + + if c.Stop > 0 { + c.C <- TickValue{Delta: int(c.Stop - uint(wc))} + } + + done <- true + return + } else if (c.Stop > 0 && stepUp && wc >= int(c.Stop)) || + (!stepUp && wc <= int(c.Stop)) || wc <= 0 { + fmt.Println("stop reached", wc, time.Since(begin)) + done <- true + return + } else { + c.C <- TickValue{Delta: c.Step} + wc = wc + c.Step + } + } + } + }() + + <-done +} + +// Finish stops +func (c *StepWorkerTicker) Finish() { + close(c.C) +} + +// LineWorkerTicker is the worker ticker that implements line adjustments to concurrency +type LineWorkerTicker struct { + C chan TickValue + + Start uint + Slope int + Stop uint + LoadDuration time.Duration + + stepTicker StepWorkerTicker +} + +// Ticker returns the ticker channer +func (c *LineWorkerTicker) Ticker() <-chan TickValue { + return c.C +} + +// Run runs the ticker +func (c *LineWorkerTicker) Run() { + + c.stepTicker = StepWorkerTicker{ + C: c.C, + Start: c.Start, + Step: c.Slope, + StepDuration: 1 * time.Second, + Stop: c.Stop, + LoadDuration: c.LoadDuration, + } + + c.stepTicker.Run() +} + +// Finish stops +func (c *LineWorkerTicker) Finish() { + c.stepTicker.Finish() +} diff --git a/runner/counter.go b/runner/counter.go new file mode 100644 index 00000000..09b47fd6 --- /dev/null +++ b/runner/counter.go @@ -0,0 +1,23 @@ +package runner + +import "sync/atomic" + +// RequestCounter gets the request count +type RequestCounter interface { + Get() uint64 +} + +// Counter is an implementation of the request counter +type Counter struct { + c uint64 +} + +// Get retrieves the current count +func (c *Counter) Get() uint64 { + return atomic.LoadUint64(&c.c) +} + +// Inc increases the current count +func (c *Counter) Inc() uint64 { + return atomic.AddUint64(&c.c, 1) +} diff --git a/runner/requester.go b/runner/requester.go index ca4894ee..d387bfb4 100644 --- a/runner/requester.go +++ b/runner/requester.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/bojand/ghz/load" "github.com/bojand/ghz/protodesc" "github.com/jhump/protoreflect/desc" "github.com/jhump/protoreflect/dynamic" @@ -52,12 +53,11 @@ type Requester struct { qpsTick time.Duration - reqCounter int64 - arrayJSONData []string - stopReason StopReason lock sync.Mutex + stopReason StopReason + workers []*Worker } // NewRequester creates a new requestor from the passed RunConfig @@ -66,17 +66,12 @@ func NewRequester(c *RunConfig) (*Requester, error) { var err error var mtd *desc.MethodDescriptor - var qpsTick time.Duration - if c.qps > 0 { - qpsTick = time.Duration(1e6/(c.qps)) * time.Microsecond - } - reqr := &Requester{ config: c, - qpsTick: qpsTick, stopReason: ReasonNormalEnd, results: make(chan *callResult, min(c.c*1000, maxResult)), - stopCh: make(chan bool, c.c), + stopCh: make(chan bool, 1), + workers: make([]*Worker, 0, c.c), conns: make([]*grpc.ClientConn, 0, c.nConns), stubs: make([]grpcdynamic.Stub, 0, c.nConns), } @@ -156,13 +151,16 @@ func NewRequester(c *RunConfig) (*Requester, error) { // Run makes all the requests and returns a report of results // It blocks until all work is done. func (b *Requester) Run() (*Report, error) { - start := time.Now() + + defer close(b.stopCh) cc, err := b.openClientConns() if err != nil { return nil, err } + start := time.Now() + b.lock.Lock() b.start = start @@ -179,7 +177,10 @@ func (b *Requester) Run() (*Report, error) { b.reporter.Run() }() - err = b.runWorkers() + wt := load.ConstWorkerTicker{N: uint(b.config.c), C: make(chan load.TickValue)} + pacer := load.ConstantPacer{Freq: uint64(b.config.qps), Max: uint64(b.config.n)} + + err = b.runWorkers(&wt, &pacer) report := b.Finish() b.closeClientConns() @@ -189,18 +190,17 @@ func (b *Requester) Run() (*Report, error) { // Stop stops the test func (b *Requester) Stop(reason StopReason) { + fmt.Println("stop():", reason) + + b.stopCh <- true + b.lock.Lock() b.stopReason = reason - b.lock.Unlock() if b.config.hasLog { b.config.log.Debugf("Stopping with reason: %+v", reason) } - - // Send stop signal so that workers can stop gracefully. - for i := 0; i < b.config.c; i++ { - b.stopCh <- true - } + b.lock.Unlock() if b.config.zstop == "close" { b.closeClientConns() @@ -326,59 +326,140 @@ func (b *Requester) newClientConn(withStatsHandler bool) (*grpc.ClientConn, erro return grpc.DialContext(ctx, b.config.host, opts...) } -func (b *Requester) runWorkers() error { - nReqPerWorker := b.config.n / b.config.c +func (b *Requester) runWorkers(wt load.WorkerTicker, p load.Pacer) error { - if b.config.c == 0 { - return nil - } + wct := wt.Ticker() + + var wm sync.Mutex + + // worker control ticker goroutine + go func() { + fmt.Println("worker ticker goroutine") + wt.Run() + }() errC := make(chan error, b.config.c) + done := make(chan struct{}) + ticks := make(chan struct{}) + counter := Counter{} - // Ignore the case where b.N % b.C != 0. + go func() { + n := 0 + for tv := range wct { + fmt.Println(tv) + if tv.Delta > 0 { + wID := "g" + strconv.Itoa(len(b.workers)+1) + "c" + strconv.Itoa(n) - n := 0 // connection counter - for i := 0; i < b.config.c; i++ { // concurrency counter + fmt.Println("wid:", wID) - wID := "g" + strconv.Itoa(i) + "c" + strconv.Itoa(n) + if len(b.config.name) > 0 { + wID = b.config.name + ":" + wID + } - if len(b.config.name) > 0 { - wID = b.config.name + ":" + wID - } + if b.config.hasLog { + b.config.log.Debugw("Creating worker with ID: "+wID, + "workerID", wID, "requests per worker") + } - if b.config.hasLog { - b.config.log.Debugw("Creating worker with ID: "+wID, - "workerID", wID, "requests per worker", nReqPerWorker) - } + w := Worker{ + ticks: ticks, + active: true, + counter: &counter, + stub: b.stubs[n], + mtd: b.mtd, + config: b.config, + stopCh: make(chan bool), + workerID: wID, + arrayJSONData: b.arrayJSONData, + } - w := Worker{ - stub: b.stubs[n], - mtd: b.mtd, - config: b.config, - stopCh: b.stopCh, - qpsTick: b.qpsTick, - reqCounter: &b.reqCounter, - nReq: nReqPerWorker, - workerID: wID, - arrayJSONData: b.arrayJSONData, - } + n++ // increment connection counter - n++ // increment connection counter + // wrap around connections if needed + if n == b.config.nConns { + n = 0 + } - // wrap around connections if needed - if n == b.config.nConns { - n = 0 + wm.Lock() + b.workers = append(b.workers, &w) + wm.Unlock() + + go func() { + errC <- w.runWorker() + }() + + } else { + nd := -1 * tv.Delta + wm.Lock() + wdc := 0 + for _, wrk := range b.workers { + if wdc == nd { + break + } + + wrk := wrk + if wrk.active { + wrk.Stop() + wdc++ + } + } + wm.Unlock() + } } + }() - go func() { - errC <- w.runWorker() + go func() { + fmt.Println("ticker goroutine") + + defer close(ticks) + defer wt.Finish() + + defer func() { + wm.Lock() + nw := len(b.workers) + for i := 0; i < nw; i++ { + b.workers[i].Stop() + } + wm.Unlock() }() - } + + began := time.Now() + + for { + wait, stop := p.Pace(time.Since(began), counter.Get()) + + if stop { + fmt.Println("stop") + done <- struct{}{} + return + } + + time.Sleep(wait) + + select { + case ticks <- struct{}{}: + counter.Inc() + case <-b.stopCh: + fmt.Println("stop. count:", counter.Get()) + done <- struct{}{} + return + } + } + }() + + fmt.Println("waiting for done.") + + <-done + + fmt.Println("done. waiting for multi err.") var err error - for i := 0; i < b.config.c; i++ { + for i := 0; i < len(b.workers); i++ { err = multierr.Append(err, <-errC) } + + fmt.Println("done run: ", counter.Get()) + return err } diff --git a/runner/worker.go b/runner/worker.go index f3825b8c..5cf91ab4 100644 --- a/runner/worker.go +++ b/runner/worker.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "io" - "sync/atomic" "time" "github.com/gogo/protobuf/proto" @@ -22,12 +21,12 @@ type Worker struct { stub grpcdynamic.Stub mtd *desc.MethodDescriptor - config *RunConfig - stopCh chan bool - qpsTick time.Duration - reqCounter *int64 - nReq int - workerID string + config *RunConfig + workerID string + active bool + counter RequestCounter + stopCh chan bool + ticks <-chan struct{} // cached messages only for binary cachedMessages []*dynamic.Message @@ -37,33 +36,38 @@ type Worker struct { } func (w *Worker) runWorker() error { - var throttle <-chan time.Time - if w.config.qps > 0 { - throttle = time.Tick(w.qpsTick) - } - var err error - for i := 0; i < w.nReq; i++ { - // Check if application is stopped. Do not send into a closed channel. + + for { select { case <-w.stopCh: - return nil - default: - if w.config.qps > 0 { - <-throttle + return err + case <-w.ticks: + if w.config.async { + go func() { + rErr := w.makeRequest() + multierr.Append(err, rErr) + }() + } else { + rErr := w.makeRequest() + multierr.Append(err, rErr) } - - rErr := w.makeRequest() - - err = multierr.Append(err, rErr) } } - return err } -func (w *Worker) makeRequest() error { +// Stop stops the worker. It has to be started with Run() again. +func (w *Worker) Stop() { + if !w.active { + return + } - reqNum := atomic.AddInt64(w.reqCounter, 1) + w.active = false + w.stopCh <- true +} + +func (w *Worker) makeRequest() error { + reqNum := int64(w.counter.Get()) ctd := newCallTemplateData(w.mtd, w.config.funcs, w.workerID, reqNum)