Skip to content

Commit

Permalink
wip on new pace logic
Browse files Browse the repository at this point in the history
  • Loading branch information
bojand committed Oct 19, 2020
1 parent 9814e0c commit afa1227
Show file tree
Hide file tree
Showing 7 changed files with 1,731 additions and 78 deletions.
32 changes: 32 additions & 0 deletions internal/helloworld/greeter_server.go
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"io"
"math/rand"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -220,6 +221,37 @@ func (c *HWStatsHandler) GetConnectionCount() int {
return val
}

// GetCountByWorker gets count of requests by goroutine
func (s *Greeter) GetCountByWorker(key CallType) map[string]int {
s.mutex.Lock()
val, ok := s.calls[key]
s.mutex.Unlock()

if !ok {
return nil
}

counts := make(map[string]int)

for _, reqs := range val {
for _, req := range reqs {
name := req.GetName()
if strings.Contains(name, "worker:") {
parts := strings.Split(name, ":")
wid := parts[len(parts)-1]
wc, ok := counts[wid]
if !ok {
counts[wid] = 0
}

counts[wid] = wc + 1
}
}
}

return counts
}

// HandleConn handle the connection
func (c *HWStatsHandler) HandleConn(ctx context.Context, cs stats.ConnStats) {
// no-op
Expand Down
322 changes: 322 additions & 0 deletions load/pacer.go
@@ -0,0 +1,322 @@
package load

import (
"fmt"
"math"
"sync"
"time"
)

// nano is the const for number of nanoseconds in a second
const nano = 1e9

// A Pacer defines the control interface to control the rate of hit.
type Pacer interface {
// Pace returns the duration the attacker should wait until
// making next hit, given an already elapsed duration and
// completed hits. If the second return value is true, an attacker
// should stop sending hits.
Pace(elapsed time.Duration, hits uint64) (wait time.Duration, stop bool)

// Rate returns a Pacer's instantaneous hit rate (per seconds)
// at the given elapsed duration of an attack.
Rate(elapsed time.Duration) float64
}

// A PacerFunc is a function adapter type that implements
// the Pacer interface.
// type PacerFunc func(time.Duration, uint64) (time.Duration, bool)

// A ConstantPacer defines a constant rate of hits for the target.
type ConstantPacer struct {
Freq uint64 // Frequency of hits per second
Max uint64 // Optional maximum allowed hits
}

// String returns a pretty-printed description of the ConstantPacer's behaviour:
// ConstantPacer{Freq: 1} => Constant{1 hits/1s}
func (cp *ConstantPacer) String() string {
return fmt.Sprintf("Constant{%d hits/%f}", cp.Freq, nano)
}

// Pace determines the length of time to sleep until the next hit is sent.
func (cp *ConstantPacer) Pace(elapsed time.Duration, hits uint64) (time.Duration, bool) {

if hits >= cp.Max {
return 0, true
}

if cp.Freq == 0 {
return 0, false // Zero value = infinite rate
}

expectedHits := uint64(cp.Freq) * uint64(elapsed/nano)
if hits < expectedHits {
// Running behind, send next hit immediately.
return 0, false
}

interval := uint64(nano / int64(cp.Freq))
if math.MaxInt64/interval < hits {
// We would overflow delta if we continued, so stop the attack.
return 0, true
}

delta := time.Duration((hits + 1) * interval)
// Zero or negative durations cause time.Sleep to return immediately.
return delta - elapsed, false
}

// Rate returns a ConstantPacer's instantaneous hit rate (i.e. requests per second)
// at the given elapsed duration of an attack. Since it's constant, the return
// value is independent of the given elapsed duration.
func (cp *ConstantPacer) Rate(elapsed time.Duration) float64 {
return cp.hitsPerNs() * 1e9
}

// hitsPerNs returns the attack rate this ConstantPacer represents, in
// fractional hits per nanosecond.
func (cp *ConstantPacer) hitsPerNs() float64 {
return float64(cp.Freq) / nano
}

// StepPacer paces an attack by starting at a given request rate
// and increasing with steps at a given time interval and duration.
type StepPacer struct {
Start ConstantPacer
Step int64
StepDuration time.Duration
Stop ConstantPacer
LoadDuration time.Duration
Max uint64

once sync.Once
init bool // TOOO improve this
constAt time.Duration
baseHits uint64
}

func (p *StepPacer) initialize() {

if p.StepDuration == 0 {
panic("StepPacer.StepDuration cannot be 0")
}

if p.Step == 0 {
panic("StepPacer.Step cannot be 0")
}

if p.Start.Freq == 0 {
panic("Start.Freq cannot be 0")
}

if p.init {
return
}

p.init = true

if p.LoadDuration > 0 {
p.constAt = p.LoadDuration

if p.Stop.Freq == 0 {
steps := p.constAt.Nanoseconds() / p.StepDuration.Nanoseconds()

p.Stop.Freq = p.Start.Freq + uint64(int64(p.Step)*steps)
}
} else if p.Stop.Freq > 0 && p.constAt == 0 {
stopRPS := float64(p.Stop.Freq)

if p.Step > 0 {
t := time.Duration(0)
for {
if p.Rate(t) > stopRPS {
p.constAt = t
break
}
t = t + p.StepDuration
}
} else {
t := time.Duration(0)
for {
if p.Rate(t) < stopRPS {
p.constAt = t
break
}
t = t + p.StepDuration
}
}
}

if p.constAt > 0 {
p.baseHits = uint64(p.hits(p.constAt))
}
}

// Pace determines the length of time to sleep until the next hit is sent.
func (p *StepPacer) Pace(elapsed time.Duration, hits uint64) (time.Duration, bool) {

if hits >= p.Max {
return 0, true
}

p.once.Do(p.initialize)

expectedHits := p.hits(elapsed)

if hits < uint64(expectedHits) {
// Running behind, send next hit immediately.
return 0, false
}

// const part
if p.constAt > 0 && elapsed >= p.constAt {
if p.Stop.Freq == 0 {
return 0, true
}

return p.Stop.Pace(elapsed-p.constAt, hits-p.baseHits)
}

rate := p.Rate(elapsed)
interval := nano / rate

if n := uint64(interval); n != 0 && math.MaxInt64/n < hits {
// We would overflow wait if we continued, so stop the attack.
return 0, true
}

delta := float64(hits+1) - expectedHits
wait := time.Duration(interval * delta)

// if wait > nano {
// intervals := elapsed / nano
// wait = (intervals+1)*nano - elapsed
// }

return wait, false
}

// Rate returns a StepPacer's instantaneous hit rate (i.e. requests per second)
// at the given elapsed duration of an attack.
func (p *StepPacer) Rate(elapsed time.Duration) float64 {
p.initialize()

t := elapsed

if p.constAt > 0 && elapsed >= p.constAt {
return float64(p.Stop.Freq)
}

steps := t.Nanoseconds() / p.StepDuration.Nanoseconds()

rate := (p.Start.hitsPerNs() + float64(int64(p.Step)*steps)/nano) * 1e9

if rate < 0 {
rate = 0
}

return rate
}

// hits returns the number of hits that have been sent during an attack
// lasting t nanoseconds. It returns a float so we can tell exactly how
// much we've missed our target by when solving numerically in Pace.
func (p *StepPacer) hits(t time.Duration) float64 {
if t < 0 {
return 0
}

steps := t.Nanoseconds() / p.StepDuration.Nanoseconds()

base := p.Start.hitsPerNs() * 1e9

// first step
var s float64
if steps > 0 {
s = p.StepDuration.Seconds() * base
} else {
s = t.Seconds() * base
}

// previous steps: 1...n
for i := int64(1); i < steps; i++ {
d := time.Duration(p.StepDuration.Nanoseconds() * i)
r := p.Rate(d)
ch := r * p.StepDuration.Seconds()
s = s + ch
}

c := float64(0)
if steps > 0 {
// current step
elapsed := time.Duration(t.Nanoseconds() - steps*p.StepDuration.Nanoseconds())
c = elapsed.Seconds() * p.Rate(t)
}

return s + c
}

// LinearPacer paces an attack by starting at a given request rate
// and increasing linearly with the given slope.
type LinearPacer struct {
Start ConstantPacer
Slope int64
Stop ConstantPacer
LoadDuration time.Duration
Max uint64

once sync.Once
sp StepPacer
}

func (p *LinearPacer) initialize() {
if p.Start.Freq == 0 {
panic("LinearPacer.Start cannot be 0")
}

if p.Slope == 0 {
panic("LinearPacer.Slope cannot be 0")
}

p.once.Do(func() {
p.sp = StepPacer{
Start: p.Start,
Step: p.Slope,
StepDuration: time.Second,
Stop: p.Stop,
LoadDuration: p.LoadDuration,
}

p.sp.initialize()
})
}

// Pace determines the length of time to sleep until the next hit is sent.
func (p *LinearPacer) Pace(elapsed time.Duration, hits uint64) (time.Duration, bool) {
if hits >= p.Max {
return 0, true
}

p.initialize()

return p.sp.Pace(elapsed, hits)
}

// Rate returns a LinearPacer's instantaneous hit rate (i.e. requests per second)
// at the given elapsed duration of an attack.
func (p *LinearPacer) Rate(elapsed time.Duration) float64 {

p.initialize()

return p.sp.Rate(elapsed)
}

// hits returns the number of hits that have been sent during an attack
// lasting t nanoseconds. It returns a float so we can tell exactly how
// much we've missed our target by when solving numerically in Pace.
func (p *LinearPacer) hits(t time.Duration) float64 {
p.initialize()

return p.sp.hits(t)
}

0 comments on commit afa1227

Please sign in to comment.