Skip to content

Commit

Permalink
Enhanced rate limiting (#620)
Browse files Browse the repository at this point in the history
* Enhanced rate limiting

* Use time.Ticker correctly
  • Loading branch information
joohoi committed Jan 4, 2023
1 parent 1a684a9 commit 2ce2217
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 110 deletions.
39 changes: 20 additions & 19 deletions pkg/ffuf/job.go
Expand Up @@ -11,7 +11,7 @@ import (
"time"
)

//Job ties together Config, Runner, Input and Output
// Job ties together Config, Runner, Input and Output
type Job struct {
Config *Config
ErrorMutex sync.Mutex
Expand Down Expand Up @@ -63,15 +63,15 @@ func NewJob(conf *Config) *Job {
return &j
}

//incError increments the error counter
// incError increments the error counter
func (j *Job) incError() {
j.ErrorMutex.Lock()
defer j.ErrorMutex.Unlock()
j.ErrorCounter++
j.SpuriousErrorCounter++
}

//inc403 increments the 403 response counter
// inc403 increments the 403 response counter
func (j *Job) inc403() {
j.ErrorMutex.Lock()
defer j.ErrorMutex.Unlock()
Expand All @@ -85,25 +85,25 @@ func (j *Job) inc429() {
j.Count429++
}

//resetSpuriousErrors resets the spurious error counter
// resetSpuriousErrors resets the spurious error counter
func (j *Job) resetSpuriousErrors() {
j.ErrorMutex.Lock()
defer j.ErrorMutex.Unlock()
j.SpuriousErrorCounter = 0
}

//DeleteQueueItem deletes a recursion job from the queue by its index in the slice
// DeleteQueueItem deletes a recursion job from the queue by its index in the slice
func (j *Job) DeleteQueueItem(index int) {
index = j.queuepos + index - 1
j.queuejobs = append(j.queuejobs[:index], j.queuejobs[index+1:]...)
}

//QueuedJobs returns the slice of queued recursive jobs
// QueuedJobs returns the slice of queued recursive jobs
func (j *Job) QueuedJobs() []QueueJob {
return j.queuejobs[j.queuepos-1:]
}

//Start the execution of the Job
// Start the execution of the Job
func (j *Job) Start() {
if j.startTime.IsZero() {
j.startTime = time.Now()
Expand Down Expand Up @@ -182,7 +182,7 @@ func (j *Job) prepareQueueJob() {
j.queuepos += 1
}

//SkipQueue allows to skip the current job and advance to the next queued recursion job
// SkipQueue allows to skip the current job and advance to the next queued recursion job
func (j *Job) SkipQueue() {
j.skipQueue = true
}
Expand Down Expand Up @@ -238,7 +238,7 @@ func (j *Job) startExecution() {
}

//Limiter blocks after reaching the buffer, ensuring limited concurrency
limiter := make(chan bool, j.Config.Threads)
threadlimiter := make(chan bool, j.Config.Threads)

for j.Input.Next() && !j.skipQueue {
// Check if we should stop the process
Expand All @@ -249,23 +249,25 @@ func (j *Job) startExecution() {
break
}
j.pauseWg.Wait()
limiter <- true
// Handle the rate & thread limiting
threadlimiter <- true
// Ratelimiter handles the rate ticker
<-j.Rate.RateLimiter.C
nextInput := j.Input.Value()
nextPosition := j.Input.Position()

wg.Add(1)
j.Counter++

go func() {
defer func() { <-limiter }()
defer func() { <-threadlimiter }()
defer wg.Done()
threadStart := time.Now()
j.runTask(nextInput, nextPosition, false)
j.sleepIfNeeded()
j.Rate.Throttle()
threadEnd := time.Now()
j.Rate.Tick(threadStart, threadEnd)
}()

if !j.RunningJob {
defer j.Output.Warning(j.Error)
return
Expand Down Expand Up @@ -306,7 +308,6 @@ func (j *Job) runBackgroundTasks(wg *sync.WaitGroup) {
if !j.RunningJob {
return
}
j.Rate.Adjust()
time.Sleep(time.Millisecond * time.Duration(j.Config.ProgressFrequency))
}
}
Expand Down Expand Up @@ -444,7 +445,7 @@ func (j *Job) runTask(input map[string][]byte, position int, retried bool) {
}
}

//handleGreedyRecursionJob adds a recursion job to the queue if the maximum depth has not been reached
// handleGreedyRecursionJob adds a recursion job to the queue if the maximum depth has not been reached
func (j *Job) handleGreedyRecursionJob(resp Response) {
// Handle greedy recursion strategy. Match has been determined before calling handleRecursionJob
if j.Config.RecursionDepth == 0 || j.currentDepth < j.Config.RecursionDepth {
Expand All @@ -457,8 +458,8 @@ func (j *Job) handleGreedyRecursionJob(resp Response) {
}
}

//handleDefaultRecursionJob adds a new recursion job to the job queue if a new directory is found and maximum depth has
//not been reached
// handleDefaultRecursionJob adds a new recursion job to the job queue if a new directory is found and maximum depth has
// not been reached
func (j *Job) handleDefaultRecursionJob(resp Response) {
recUrl := resp.Request.Url + "/" + "FUZZ"
if (resp.Request.Url + "/") != resp.GetRedirectLocation(true) {
Expand Down Expand Up @@ -523,13 +524,13 @@ func (j *Job) CheckStop() {
}
}

//Stop the execution of the Job
// Stop the execution of the Job
func (j *Job) Stop() {
j.Running = false
j.Config.Cancel()
}

//Stop current, resume to next
// Stop current, resume to next
func (j *Job) Next() {
j.RunningJob = false
}
120 changes: 48 additions & 72 deletions pkg/ffuf/rate.go
Expand Up @@ -7,100 +7,76 @@ import (
)

type RateThrottle struct {
rateCounter *ring.Ring
RateAdjustment float64
RateAdjustmentPos int
Config *Config
RateMutex sync.Mutex
lastAdjustment time.Time
rateCounter *ring.Ring
Config *Config
RateMutex sync.Mutex
RateLimiter *time.Ticker
lastAdjustment time.Time
}

func NewRateThrottle(conf *Config) *RateThrottle {
return &RateThrottle{
rateCounter: ring.New(conf.Threads),
RateAdjustment: 0,
RateAdjustmentPos: 0,
Config: conf,
lastAdjustment: time.Now(),
r := &RateThrottle{
Config: conf,
lastAdjustment: time.Now(),
}
if conf.Rate > 0 {
r.rateCounter = ring.New(int(conf.Rate * 5))
} else {
r.rateCounter = ring.New(conf.Threads * 5)
}
if conf.Rate > 0 {
ratemicros := 1000000 / conf.Rate
r.RateLimiter = time.NewTicker(time.Microsecond * time.Duration(ratemicros))
} else {
//Million rps is probably a decent hardcoded upper speedlimit
r.RateLimiter = time.NewTicker(time.Microsecond * 1)
}
return r
}

//CurrentRate calculates requests/second value from circular list of rate
// CurrentRate calculates requests/second value from circular list of rate
func (r *RateThrottle) CurrentRate() int64 {
n := r.rateCounter.Len()
var total int64
total = 0
lowest := int64(0)
highest := int64(0)
r.rateCounter.Do(func(r interface{}) {
switch val := r.(type) {
case int64:
total += val
if lowest == 0 || val < lowest {
lowest = val
}
if val > highest {
highest = val
}
default:
// circular list entry was nil, happens when < number_of_threads responses have been recorded.
// circular list entry was nil, happens when < number_of_threads * 5 responses have been recorded.
// the total number of entries is less than length of the list
n -= 1
}
})
if total > 0 {
avg := total / int64(n)
return time.Second.Nanoseconds() * int64(r.Config.Threads) / avg
}

return 0
}

//rateTick adds a new duration measurement tick to rate counter
func (r *RateThrottle) Tick(start, end time.Time) {
if start.Before(r.lastAdjustment) {
// We don't want to store data for threads started pre-adjustment
return
earliest := time.UnixMicro(lowest)
latest := time.UnixMicro(highest)
elapsed := latest.Sub(earliest)
if n > 0 && elapsed.Milliseconds() > 1 {
return int64(1000 * int64(n) / elapsed.Milliseconds())
}
r.RateMutex.Lock()
defer r.RateMutex.Unlock()
dur := end.Sub(start).Nanoseconds()
r.rateCounter = r.rateCounter.Next()
r.RateAdjustmentPos += 1
r.rateCounter.Value = dur
return 0
}

func (r *RateThrottle) Throttle() {
if r.Config.Rate == 0 {
// No throttling
return
}
if r.RateAdjustment > 0.0 {
delayNS := float64(time.Second.Nanoseconds()) * r.RateAdjustment
time.Sleep(time.Nanosecond * time.Duration(delayNS))
}
func (r *RateThrottle) ChangeRate(rate int) {
ratemicros := 1000000 / rate
r.RateLimiter.Stop()
r.RateLimiter = time.NewTicker(time.Microsecond * time.Duration(ratemicros))
r.Config.Rate = int64(rate)
// reset the rate counter
r.rateCounter = ring.New(rate * 5)
}

//Adjust changes the RateAdjustment value, which is multiplier of second to pause between requests in a thread
func (r *RateThrottle) Adjust() {
if r.RateAdjustmentPos < r.Config.Threads {
// Do not adjust if we don't have enough data yet
return
}
// rateTick adds a new duration measurement tick to rate counter
func (r *RateThrottle) Tick(start, end time.Time) {
r.RateMutex.Lock()
defer r.RateMutex.Unlock()
currentRate := r.CurrentRate()

if r.RateAdjustment == 0.0 {
if currentRate > r.Config.Rate {
// If we're adjusting the rate for the first time, start at a safe point (0.2sec)
r.RateAdjustment = 0.2
return
} else {
// NOOP
return
}
}
difference := float64(currentRate) / float64(r.Config.Rate)
if r.RateAdjustment < 0.00001 && difference < 0.9 {
// Reset the rate adjustment as throttling is not relevant at current speed
r.RateAdjustment = 0.0
} else {
r.RateAdjustment = r.RateAdjustment * difference
}
// Reset the counters
r.lastAdjustment = time.Now()
r.RateAdjustmentPos = 0
r.rateCounter = r.rateCounter.Next()
r.rateCounter.Value = end.UnixMicro()
}
54 changes: 35 additions & 19 deletions pkg/interactive/termhandler.go
Expand Up @@ -177,6 +177,20 @@ func (i *interactive) handleInput(in []byte) {
case "queueskip":
i.Job.SkipQueue()
i.Job.Output.Info("Skipping to the next queued job")
case "rate":
if len(args) < 2 {
i.Job.Output.Error("Please define the new rate")
} else if len(args) > 2 {
i.Job.Output.Error("Too many arguments for \"rate\"")
} else {
newrate, err := strconv.Atoi(args[1])
if err != nil {
i.Job.Output.Error(fmt.Sprintf("Could not adjust rate: %s", err))
} else {
i.Job.Rate.ChangeRate(newrate)
}
}

default:
if i.paused {
i.Job.Output.Warning(fmt.Sprintf("Unknown command: \"%s\". Enter \"help\" for a list of available commands", args[0]))
Expand Down Expand Up @@ -278,26 +292,28 @@ func (i *interactive) printHelp() {
ft = "(active: " + filter.Repr() + ")"
}
}
rate := fmt.Sprintf("(active: %d)", i.Job.Config.Rate)
help := `
available commands:
afc [value] - append to status code filter %s
fc [value] - (re)configure status code filter %s
afl [value] - append to line count filter %s
fl [value] - (re)configure line count filter %s
afw [value] - append to word count filter %s
fw [value] - (re)configure word count filter %s
afs [value] - append to size filter %s
fs [value] - (re)configure size filter %s
aft [value] - append to time filter %s
ft [value] - (re)configure time filter %s
queueshow - show job queue
queuedel [number] - delete a job in the queue
queueskip - advance to the next queued job
restart - restart and resume the current ffuf job
resume - resume current ffuf job (or: ENTER)
show - show results for the current job
savejson [filename] - save current matches to a file
help - you are looking at it
afc [value] - append to status code filter %s
fc [value] - (re)configure status code filter %s
afl [value] - append to line count filter %s
fl [value] - (re)configure line count filter %s
afw [value] - append to word count filter %s
fw [value] - (re)configure word count filter %s
afs [value] - append to size filter %s
fs [value] - (re)configure size filter %s
aft [value] - append to time filter %s
ft [value] - (re)configure time filter %s
rate [value] - adjust rate of requests per second %s
queueshow - show job queue
queuedel [number] - delete a job in the queue
queueskip - advance to the next queued job
restart - restart and resume the current ffuf job
resume - resume current ffuf job (or: ENTER)
show - show results for the current job
savejson [filename] - save current matches to a file
help - you are looking at it
`
i.Job.Output.Raw(fmt.Sprintf(help, fc, fc, fl, fl, fw, fw, fs, fs, ft, ft))
i.Job.Output.Raw(fmt.Sprintf(help, fc, fc, fl, fl, fw, fw, fs, fs, ft, ft, rate))
}

0 comments on commit 2ce2217

Please sign in to comment.