Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhanced rate limiting #620

Merged
merged 2 commits into from
Jan 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 20 additions & 19 deletions pkg/ffuf/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"time"
)

//Job ties together Config, Runner, Input and Output
// Job ties together Config, Runner, Input and Output
type Job struct {
Config *Config
ErrorMutex sync.Mutex
Expand Down Expand Up @@ -63,15 +63,15 @@ func NewJob(conf *Config) *Job {
return &j
}

//incError increments the error counter
// incError increments the error counter
func (j *Job) incError() {
j.ErrorMutex.Lock()
defer j.ErrorMutex.Unlock()
j.ErrorCounter++
j.SpuriousErrorCounter++
}

//inc403 increments the 403 response counter
// inc403 increments the 403 response counter
func (j *Job) inc403() {
j.ErrorMutex.Lock()
defer j.ErrorMutex.Unlock()
Expand All @@ -85,25 +85,25 @@ func (j *Job) inc429() {
j.Count429++
}

//resetSpuriousErrors resets the spurious error counter
// resetSpuriousErrors resets the spurious error counter
func (j *Job) resetSpuriousErrors() {
j.ErrorMutex.Lock()
defer j.ErrorMutex.Unlock()
j.SpuriousErrorCounter = 0
}

//DeleteQueueItem deletes a recursion job from the queue by its index in the slice
// DeleteQueueItem deletes a recursion job from the queue by its index in the slice
func (j *Job) DeleteQueueItem(index int) {
index = j.queuepos + index - 1
j.queuejobs = append(j.queuejobs[:index], j.queuejobs[index+1:]...)
}

//QueuedJobs returns the slice of queued recursive jobs
// QueuedJobs returns the slice of queued recursive jobs
func (j *Job) QueuedJobs() []QueueJob {
return j.queuejobs[j.queuepos-1:]
}

//Start the execution of the Job
// Start the execution of the Job
func (j *Job) Start() {
if j.startTime.IsZero() {
j.startTime = time.Now()
Expand Down Expand Up @@ -182,7 +182,7 @@ func (j *Job) prepareQueueJob() {
j.queuepos += 1
}

//SkipQueue allows to skip the current job and advance to the next queued recursion job
// SkipQueue allows to skip the current job and advance to the next queued recursion job
func (j *Job) SkipQueue() {
j.skipQueue = true
}
Expand Down Expand Up @@ -238,7 +238,7 @@ func (j *Job) startExecution() {
}

//Limiter blocks after reaching the buffer, ensuring limited concurrency
limiter := make(chan bool, j.Config.Threads)
threadlimiter := make(chan bool, j.Config.Threads)

for j.Input.Next() && !j.skipQueue {
// Check if we should stop the process
Expand All @@ -249,23 +249,25 @@ func (j *Job) startExecution() {
break
}
j.pauseWg.Wait()
limiter <- true
// Handle the rate & thread limiting
threadlimiter <- true
// Ratelimiter handles the rate ticker
<-j.Rate.RateLimiter.C
nextInput := j.Input.Value()
nextPosition := j.Input.Position()

wg.Add(1)
j.Counter++

go func() {
defer func() { <-limiter }()
defer func() { <-threadlimiter }()
defer wg.Done()
threadStart := time.Now()
j.runTask(nextInput, nextPosition, false)
j.sleepIfNeeded()
j.Rate.Throttle()
threadEnd := time.Now()
j.Rate.Tick(threadStart, threadEnd)
}()

if !j.RunningJob {
defer j.Output.Warning(j.Error)
return
Expand Down Expand Up @@ -306,7 +308,6 @@ func (j *Job) runBackgroundTasks(wg *sync.WaitGroup) {
if !j.RunningJob {
return
}
j.Rate.Adjust()
time.Sleep(time.Millisecond * time.Duration(j.Config.ProgressFrequency))
}
}
Expand Down Expand Up @@ -444,7 +445,7 @@ func (j *Job) runTask(input map[string][]byte, position int, retried bool) {
}
}

//handleGreedyRecursionJob adds a recursion job to the queue if the maximum depth has not been reached
// handleGreedyRecursionJob adds a recursion job to the queue if the maximum depth has not been reached
func (j *Job) handleGreedyRecursionJob(resp Response) {
// Handle greedy recursion strategy. Match has been determined before calling handleRecursionJob
if j.Config.RecursionDepth == 0 || j.currentDepth < j.Config.RecursionDepth {
Expand All @@ -457,8 +458,8 @@ func (j *Job) handleGreedyRecursionJob(resp Response) {
}
}

//handleDefaultRecursionJob adds a new recursion job to the job queue if a new directory is found and maximum depth has
//not been reached
// handleDefaultRecursionJob adds a new recursion job to the job queue if a new directory is found and maximum depth has
// not been reached
func (j *Job) handleDefaultRecursionJob(resp Response) {
recUrl := resp.Request.Url + "/" + "FUZZ"
if (resp.Request.Url + "/") != resp.GetRedirectLocation(true) {
Expand Down Expand Up @@ -523,13 +524,13 @@ func (j *Job) CheckStop() {
}
}

//Stop the execution of the Job
// Stop the execution of the Job
func (j *Job) Stop() {
j.Running = false
j.Config.Cancel()
}

//Stop current, resume to next
// Stop current, resume to next
func (j *Job) Next() {
j.RunningJob = false
}
120 changes: 48 additions & 72 deletions pkg/ffuf/rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,100 +7,76 @@ import (
)

type RateThrottle struct {
rateCounter *ring.Ring
RateAdjustment float64
RateAdjustmentPos int
Config *Config
RateMutex sync.Mutex
lastAdjustment time.Time
rateCounter *ring.Ring
Config *Config
RateMutex sync.Mutex
RateLimiter *time.Ticker
lastAdjustment time.Time
}

func NewRateThrottle(conf *Config) *RateThrottle {
return &RateThrottle{
rateCounter: ring.New(conf.Threads),
RateAdjustment: 0,
RateAdjustmentPos: 0,
Config: conf,
lastAdjustment: time.Now(),
r := &RateThrottle{
Config: conf,
lastAdjustment: time.Now(),
}
if conf.Rate > 0 {
r.rateCounter = ring.New(int(conf.Rate * 5))
} else {
r.rateCounter = ring.New(conf.Threads * 5)
}
if conf.Rate > 0 {
ratemicros := 1000000 / conf.Rate
r.RateLimiter = time.NewTicker(time.Microsecond * time.Duration(ratemicros))
} else {
//Million rps is probably a decent hardcoded upper speedlimit
r.RateLimiter = time.NewTicker(time.Microsecond * 1)
}
return r
}

//CurrentRate calculates requests/second value from circular list of rate
// CurrentRate calculates requests/second value from circular list of rate
func (r *RateThrottle) CurrentRate() int64 {
n := r.rateCounter.Len()
var total int64
total = 0
lowest := int64(0)
highest := int64(0)
r.rateCounter.Do(func(r interface{}) {
switch val := r.(type) {
case int64:
total += val
if lowest == 0 || val < lowest {
lowest = val
}
if val > highest {
highest = val
}
default:
// circular list entry was nil, happens when < number_of_threads responses have been recorded.
// circular list entry was nil, happens when < number_of_threads * 5 responses have been recorded.
// the total number of entries is less than length of the list
n -= 1
}
})
if total > 0 {
avg := total / int64(n)
return time.Second.Nanoseconds() * int64(r.Config.Threads) / avg
}

return 0
}

//rateTick adds a new duration measurement tick to rate counter
func (r *RateThrottle) Tick(start, end time.Time) {
if start.Before(r.lastAdjustment) {
// We don't want to store data for threads started pre-adjustment
return
earliest := time.UnixMicro(lowest)
latest := time.UnixMicro(highest)
elapsed := latest.Sub(earliest)
if n > 0 && elapsed.Milliseconds() > 1 {
return int64(1000 * int64(n) / elapsed.Milliseconds())
}
r.RateMutex.Lock()
defer r.RateMutex.Unlock()
dur := end.Sub(start).Nanoseconds()
r.rateCounter = r.rateCounter.Next()
r.RateAdjustmentPos += 1
r.rateCounter.Value = dur
return 0
}

func (r *RateThrottle) Throttle() {
if r.Config.Rate == 0 {
// No throttling
return
}
if r.RateAdjustment > 0.0 {
delayNS := float64(time.Second.Nanoseconds()) * r.RateAdjustment
time.Sleep(time.Nanosecond * time.Duration(delayNS))
}
func (r *RateThrottle) ChangeRate(rate int) {
ratemicros := 1000000 / rate
r.RateLimiter.Stop()
r.RateLimiter = time.NewTicker(time.Microsecond * time.Duration(ratemicros))
r.Config.Rate = int64(rate)
// reset the rate counter
r.rateCounter = ring.New(rate * 5)
}

//Adjust changes the RateAdjustment value, which is multiplier of second to pause between requests in a thread
func (r *RateThrottle) Adjust() {
if r.RateAdjustmentPos < r.Config.Threads {
// Do not adjust if we don't have enough data yet
return
}
// rateTick adds a new duration measurement tick to rate counter
func (r *RateThrottle) Tick(start, end time.Time) {
r.RateMutex.Lock()
defer r.RateMutex.Unlock()
currentRate := r.CurrentRate()

if r.RateAdjustment == 0.0 {
if currentRate > r.Config.Rate {
// If we're adjusting the rate for the first time, start at a safe point (0.2sec)
r.RateAdjustment = 0.2
return
} else {
// NOOP
return
}
}
difference := float64(currentRate) / float64(r.Config.Rate)
if r.RateAdjustment < 0.00001 && difference < 0.9 {
// Reset the rate adjustment as throttling is not relevant at current speed
r.RateAdjustment = 0.0
} else {
r.RateAdjustment = r.RateAdjustment * difference
}
// Reset the counters
r.lastAdjustment = time.Now()
r.RateAdjustmentPos = 0
r.rateCounter = r.rateCounter.Next()
r.rateCounter.Value = end.UnixMicro()
}
54 changes: 35 additions & 19 deletions pkg/interactive/termhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,20 @@ func (i *interactive) handleInput(in []byte) {
case "queueskip":
i.Job.SkipQueue()
i.Job.Output.Info("Skipping to the next queued job")
case "rate":
if len(args) < 2 {
i.Job.Output.Error("Please define the new rate")
} else if len(args) > 2 {
i.Job.Output.Error("Too many arguments for \"rate\"")
} else {
newrate, err := strconv.Atoi(args[1])
if err != nil {
i.Job.Output.Error(fmt.Sprintf("Could not adjust rate: %s", err))
} else {
i.Job.Rate.ChangeRate(newrate)
}
}

default:
if i.paused {
i.Job.Output.Warning(fmt.Sprintf("Unknown command: \"%s\". Enter \"help\" for a list of available commands", args[0]))
Expand Down Expand Up @@ -278,26 +292,28 @@ func (i *interactive) printHelp() {
ft = "(active: " + filter.Repr() + ")"
}
}
rate := fmt.Sprintf("(active: %d)", i.Job.Config.Rate)
help := `
available commands:
afc [value] - append to status code filter %s
fc [value] - (re)configure status code filter %s
afl [value] - append to line count filter %s
fl [value] - (re)configure line count filter %s
afw [value] - append to word count filter %s
fw [value] - (re)configure word count filter %s
afs [value] - append to size filter %s
fs [value] - (re)configure size filter %s
aft [value] - append to time filter %s
ft [value] - (re)configure time filter %s
queueshow - show job queue
queuedel [number] - delete a job in the queue
queueskip - advance to the next queued job
restart - restart and resume the current ffuf job
resume - resume current ffuf job (or: ENTER)
show - show results for the current job
savejson [filename] - save current matches to a file
help - you are looking at it
afc [value] - append to status code filter %s
fc [value] - (re)configure status code filter %s
afl [value] - append to line count filter %s
fl [value] - (re)configure line count filter %s
afw [value] - append to word count filter %s
fw [value] - (re)configure word count filter %s
afs [value] - append to size filter %s
fs [value] - (re)configure size filter %s
aft [value] - append to time filter %s
ft [value] - (re)configure time filter %s
rate [value] - adjust rate of requests per second %s
queueshow - show job queue
queuedel [number] - delete a job in the queue
queueskip - advance to the next queued job
restart - restart and resume the current ffuf job
resume - resume current ffuf job (or: ENTER)
show - show results for the current job
savejson [filename] - save current matches to a file
help - you are looking at it
`
i.Job.Output.Raw(fmt.Sprintf(help, fc, fc, fl, fl, fw, fw, fs, fs, ft, ft))
i.Job.Output.Raw(fmt.Sprintf(help, fc, fc, fl, fl, fw, fw, fs, fs, ft, ft, rate))
}