From 2ce22175dad658cf46f97ac4582dcff58549e646 Mon Sep 17 00:00:00 2001 From: Joona Hoikkala Date: Wed, 4 Jan 2023 11:23:32 +0200 Subject: [PATCH] Enhanced rate limiting (#620) * Enhanced rate limiting * Use time.Ticker correctly --- pkg/ffuf/job.go | 39 +++++------ pkg/ffuf/rate.go | 120 +++++++++++++-------------------- pkg/interactive/termhandler.go | 54 +++++++++------ 3 files changed, 103 insertions(+), 110 deletions(-) diff --git a/pkg/ffuf/job.go b/pkg/ffuf/job.go index 5bea32c2..768c2317 100644 --- a/pkg/ffuf/job.go +++ b/pkg/ffuf/job.go @@ -11,7 +11,7 @@ import ( "time" ) -//Job ties together Config, Runner, Input and Output +// Job ties together Config, Runner, Input and Output type Job struct { Config *Config ErrorMutex sync.Mutex @@ -63,7 +63,7 @@ func NewJob(conf *Config) *Job { return &j } -//incError increments the error counter +// incError increments the error counter func (j *Job) incError() { j.ErrorMutex.Lock() defer j.ErrorMutex.Unlock() @@ -71,7 +71,7 @@ func (j *Job) incError() { j.SpuriousErrorCounter++ } -//inc403 increments the 403 response counter +// inc403 increments the 403 response counter func (j *Job) inc403() { j.ErrorMutex.Lock() defer j.ErrorMutex.Unlock() @@ -85,25 +85,25 @@ func (j *Job) inc429() { j.Count429++ } -//resetSpuriousErrors resets the spurious error counter +// resetSpuriousErrors resets the spurious error counter func (j *Job) resetSpuriousErrors() { j.ErrorMutex.Lock() defer j.ErrorMutex.Unlock() j.SpuriousErrorCounter = 0 } -//DeleteQueueItem deletes a recursion job from the queue by its index in the slice +// DeleteQueueItem deletes a recursion job from the queue by its index in the slice func (j *Job) DeleteQueueItem(index int) { index = j.queuepos + index - 1 j.queuejobs = append(j.queuejobs[:index], j.queuejobs[index+1:]...) } -//QueuedJobs returns the slice of queued recursive jobs +// QueuedJobs returns the slice of queued recursive jobs func (j *Job) QueuedJobs() []QueueJob { return j.queuejobs[j.queuepos-1:] } -//Start the execution of the Job +// Start the execution of the Job func (j *Job) Start() { if j.startTime.IsZero() { j.startTime = time.Now() @@ -182,7 +182,7 @@ func (j *Job) prepareQueueJob() { j.queuepos += 1 } -//SkipQueue allows to skip the current job and advance to the next queued recursion job +// SkipQueue allows to skip the current job and advance to the next queued recursion job func (j *Job) SkipQueue() { j.skipQueue = true } @@ -238,7 +238,7 @@ func (j *Job) startExecution() { } //Limiter blocks after reaching the buffer, ensuring limited concurrency - limiter := make(chan bool, j.Config.Threads) + threadlimiter := make(chan bool, j.Config.Threads) for j.Input.Next() && !j.skipQueue { // Check if we should stop the process @@ -249,23 +249,25 @@ func (j *Job) startExecution() { break } j.pauseWg.Wait() - limiter <- true + // Handle the rate & thread limiting + threadlimiter <- true + // Ratelimiter handles the rate ticker + <-j.Rate.RateLimiter.C nextInput := j.Input.Value() nextPosition := j.Input.Position() + wg.Add(1) j.Counter++ go func() { - defer func() { <-limiter }() + defer func() { <-threadlimiter }() defer wg.Done() threadStart := time.Now() j.runTask(nextInput, nextPosition, false) j.sleepIfNeeded() - j.Rate.Throttle() threadEnd := time.Now() j.Rate.Tick(threadStart, threadEnd) }() - if !j.RunningJob { defer j.Output.Warning(j.Error) return @@ -306,7 +308,6 @@ func (j *Job) runBackgroundTasks(wg *sync.WaitGroup) { if !j.RunningJob { return } - j.Rate.Adjust() time.Sleep(time.Millisecond * time.Duration(j.Config.ProgressFrequency)) } } @@ -444,7 +445,7 @@ func (j *Job) runTask(input map[string][]byte, position int, retried bool) { } } -//handleGreedyRecursionJob adds a recursion job to the queue if the maximum depth has not been reached +// handleGreedyRecursionJob adds a recursion job to the queue if the maximum depth has not been reached func (j *Job) handleGreedyRecursionJob(resp Response) { // Handle greedy recursion strategy. Match has been determined before calling handleRecursionJob if j.Config.RecursionDepth == 0 || j.currentDepth < j.Config.RecursionDepth { @@ -457,8 +458,8 @@ func (j *Job) handleGreedyRecursionJob(resp Response) { } } -//handleDefaultRecursionJob adds a new recursion job to the job queue if a new directory is found and maximum depth has -//not been reached +// handleDefaultRecursionJob adds a new recursion job to the job queue if a new directory is found and maximum depth has +// not been reached func (j *Job) handleDefaultRecursionJob(resp Response) { recUrl := resp.Request.Url + "/" + "FUZZ" if (resp.Request.Url + "/") != resp.GetRedirectLocation(true) { @@ -523,13 +524,13 @@ func (j *Job) CheckStop() { } } -//Stop the execution of the Job +// Stop the execution of the Job func (j *Job) Stop() { j.Running = false j.Config.Cancel() } -//Stop current, resume to next +// Stop current, resume to next func (j *Job) Next() { j.RunningJob = false } diff --git a/pkg/ffuf/rate.go b/pkg/ffuf/rate.go index 7c709928..48a69404 100644 --- a/pkg/ffuf/rate.go +++ b/pkg/ffuf/rate.go @@ -7,100 +7,76 @@ import ( ) type RateThrottle struct { - rateCounter *ring.Ring - RateAdjustment float64 - RateAdjustmentPos int - Config *Config - RateMutex sync.Mutex - lastAdjustment time.Time + rateCounter *ring.Ring + Config *Config + RateMutex sync.Mutex + RateLimiter *time.Ticker + lastAdjustment time.Time } func NewRateThrottle(conf *Config) *RateThrottle { - return &RateThrottle{ - rateCounter: ring.New(conf.Threads), - RateAdjustment: 0, - RateAdjustmentPos: 0, - Config: conf, - lastAdjustment: time.Now(), + r := &RateThrottle{ + Config: conf, + lastAdjustment: time.Now(), } + if conf.Rate > 0 { + r.rateCounter = ring.New(int(conf.Rate * 5)) + } else { + r.rateCounter = ring.New(conf.Threads * 5) + } + if conf.Rate > 0 { + ratemicros := 1000000 / conf.Rate + r.RateLimiter = time.NewTicker(time.Microsecond * time.Duration(ratemicros)) + } else { + //Million rps is probably a decent hardcoded upper speedlimit + r.RateLimiter = time.NewTicker(time.Microsecond * 1) + } + return r } -//CurrentRate calculates requests/second value from circular list of rate +// CurrentRate calculates requests/second value from circular list of rate func (r *RateThrottle) CurrentRate() int64 { n := r.rateCounter.Len() - var total int64 - total = 0 + lowest := int64(0) + highest := int64(0) r.rateCounter.Do(func(r interface{}) { switch val := r.(type) { case int64: - total += val + if lowest == 0 || val < lowest { + lowest = val + } + if val > highest { + highest = val + } default: - // circular list entry was nil, happens when < number_of_threads responses have been recorded. + // circular list entry was nil, happens when < number_of_threads * 5 responses have been recorded. // the total number of entries is less than length of the list n -= 1 } }) - if total > 0 { - avg := total / int64(n) - return time.Second.Nanoseconds() * int64(r.Config.Threads) / avg - } - - return 0 -} -//rateTick adds a new duration measurement tick to rate counter -func (r *RateThrottle) Tick(start, end time.Time) { - if start.Before(r.lastAdjustment) { - // We don't want to store data for threads started pre-adjustment - return + earliest := time.UnixMicro(lowest) + latest := time.UnixMicro(highest) + elapsed := latest.Sub(earliest) + if n > 0 && elapsed.Milliseconds() > 1 { + return int64(1000 * int64(n) / elapsed.Milliseconds()) } - r.RateMutex.Lock() - defer r.RateMutex.Unlock() - dur := end.Sub(start).Nanoseconds() - r.rateCounter = r.rateCounter.Next() - r.RateAdjustmentPos += 1 - r.rateCounter.Value = dur + return 0 } -func (r *RateThrottle) Throttle() { - if r.Config.Rate == 0 { - // No throttling - return - } - if r.RateAdjustment > 0.0 { - delayNS := float64(time.Second.Nanoseconds()) * r.RateAdjustment - time.Sleep(time.Nanosecond * time.Duration(delayNS)) - } +func (r *RateThrottle) ChangeRate(rate int) { + ratemicros := 1000000 / rate + r.RateLimiter.Stop() + r.RateLimiter = time.NewTicker(time.Microsecond * time.Duration(ratemicros)) + r.Config.Rate = int64(rate) + // reset the rate counter + r.rateCounter = ring.New(rate * 5) } -//Adjust changes the RateAdjustment value, which is multiplier of second to pause between requests in a thread -func (r *RateThrottle) Adjust() { - if r.RateAdjustmentPos < r.Config.Threads { - // Do not adjust if we don't have enough data yet - return - } +// rateTick adds a new duration measurement tick to rate counter +func (r *RateThrottle) Tick(start, end time.Time) { r.RateMutex.Lock() defer r.RateMutex.Unlock() - currentRate := r.CurrentRate() - - if r.RateAdjustment == 0.0 { - if currentRate > r.Config.Rate { - // If we're adjusting the rate for the first time, start at a safe point (0.2sec) - r.RateAdjustment = 0.2 - return - } else { - // NOOP - return - } - } - difference := float64(currentRate) / float64(r.Config.Rate) - if r.RateAdjustment < 0.00001 && difference < 0.9 { - // Reset the rate adjustment as throttling is not relevant at current speed - r.RateAdjustment = 0.0 - } else { - r.RateAdjustment = r.RateAdjustment * difference - } - // Reset the counters - r.lastAdjustment = time.Now() - r.RateAdjustmentPos = 0 + r.rateCounter = r.rateCounter.Next() + r.rateCounter.Value = end.UnixMicro() } diff --git a/pkg/interactive/termhandler.go b/pkg/interactive/termhandler.go index 5846bf44..63a35f57 100644 --- a/pkg/interactive/termhandler.go +++ b/pkg/interactive/termhandler.go @@ -177,6 +177,20 @@ func (i *interactive) handleInput(in []byte) { case "queueskip": i.Job.SkipQueue() i.Job.Output.Info("Skipping to the next queued job") + case "rate": + if len(args) < 2 { + i.Job.Output.Error("Please define the new rate") + } else if len(args) > 2 { + i.Job.Output.Error("Too many arguments for \"rate\"") + } else { + newrate, err := strconv.Atoi(args[1]) + if err != nil { + i.Job.Output.Error(fmt.Sprintf("Could not adjust rate: %s", err)) + } else { + i.Job.Rate.ChangeRate(newrate) + } + } + default: if i.paused { i.Job.Output.Warning(fmt.Sprintf("Unknown command: \"%s\". Enter \"help\" for a list of available commands", args[0])) @@ -278,26 +292,28 @@ func (i *interactive) printHelp() { ft = "(active: " + filter.Repr() + ")" } } + rate := fmt.Sprintf("(active: %d)", i.Job.Config.Rate) help := ` available commands: - afc [value] - append to status code filter %s - fc [value] - (re)configure status code filter %s - afl [value] - append to line count filter %s - fl [value] - (re)configure line count filter %s - afw [value] - append to word count filter %s - fw [value] - (re)configure word count filter %s - afs [value] - append to size filter %s - fs [value] - (re)configure size filter %s - aft [value] - append to time filter %s - ft [value] - (re)configure time filter %s - queueshow - show job queue - queuedel [number] - delete a job in the queue - queueskip - advance to the next queued job - restart - restart and resume the current ffuf job - resume - resume current ffuf job (or: ENTER) - show - show results for the current job - savejson [filename] - save current matches to a file - help - you are looking at it + afc [value] - append to status code filter %s + fc [value] - (re)configure status code filter %s + afl [value] - append to line count filter %s + fl [value] - (re)configure line count filter %s + afw [value] - append to word count filter %s + fw [value] - (re)configure word count filter %s + afs [value] - append to size filter %s + fs [value] - (re)configure size filter %s + aft [value] - append to time filter %s + ft [value] - (re)configure time filter %s + rate [value] - adjust rate of requests per second %s + queueshow - show job queue + queuedel [number] - delete a job in the queue + queueskip - advance to the next queued job + restart - restart and resume the current ffuf job + resume - resume current ffuf job (or: ENTER) + show - show results for the current job + savejson [filename] - save current matches to a file + help - you are looking at it ` - i.Job.Output.Raw(fmt.Sprintf(help, fc, fc, fl, fl, fw, fw, fs, fs, ft, ft)) + i.Job.Output.Raw(fmt.Sprintf(help, fc, fc, fl, fl, fw, fw, fs, fs, ft, ft, rate)) }