Skip to content

Commit

Permalink
Merge pull request #1279 from hashicorp/rate-limit-consul
Browse files Browse the repository at this point in the history
add basic rate limiting to consul api calls
  • Loading branch information
eikenb committed Sep 10, 2019
2 parents 612ebbc + 008b27a commit 2a944d7
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 4 deletions.
8 changes: 4 additions & 4 deletions watch/dependencies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type TestDep struct {
}

func (d *TestDep) Fetch(clients *dep.ClientSet, opts *dep.QueryOptions) (interface{}, *dep.ResponseMetadata, error) {
time.Sleep(10 * time.Millisecond)
time.Sleep(time.Millisecond)
data := "this is some data"
rm := &dep.ResponseMetadata{LastIndex: 1}
return data, rm, nil
Expand Down Expand Up @@ -42,7 +42,7 @@ type TestDepStale struct {

// Fetch is used to implement the dependency interface.
func (d *TestDepStale) Fetch(clients *dep.ClientSet, opts *dep.QueryOptions) (interface{}, *dep.ResponseMetadata, error) {
time.Sleep(10 * time.Millisecond)
time.Sleep(time.Millisecond)

if opts == nil {
opts = &dep.QueryOptions{}
Expand Down Expand Up @@ -79,7 +79,7 @@ type TestDepFetchError struct {
}

func (d *TestDepFetchError) Fetch(clients *dep.ClientSet, opts *dep.QueryOptions) (interface{}, *dep.ResponseMetadata, error) {
time.Sleep(10 * time.Millisecond)
time.Sleep(time.Millisecond)
return nil, nil, fmt.Errorf("failed to contact server")
}

Expand Down Expand Up @@ -129,7 +129,7 @@ type TestDepRetry struct {
}

func (d *TestDepRetry) Fetch(clients *dep.ClientSet, opts *dep.QueryOptions) (interface{}, *dep.ResponseMetadata, error) {
time.Sleep(10 * time.Millisecond)
time.Sleep(time.Millisecond)

d.Lock()
defer d.Unlock()
Expand Down
19 changes: 19 additions & 0 deletions watch/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package watch
import (
"fmt"
"log"
"math/rand"
"reflect"
"sync"
"time"
Expand Down Expand Up @@ -207,6 +208,8 @@ func (v *View) fetch(doneCh, successCh chan<- struct{}, errCh chan<- error) {
default:
}

start := time.Now() // for rateLimiter below

data, rm, err := v.dependency.Fetch(v.clients, &dep.QueryOptions{
AllowStale: allowStale,
WaitTime: defaultWaitTime,
Expand Down Expand Up @@ -247,6 +250,10 @@ func (v *View) fetch(doneCh, successCh chan<- struct{}, errCh chan<- error) {
allowStale = true
}

if dur := rateLimiter(start); dur > 1 {
time.Sleep(dur)
}

if rm.LastIndex == v.lastIndex {
log.Printf("[TRACE] (view) %s no new data (index was the same)", v.dependency)
continue
Expand Down Expand Up @@ -282,6 +289,18 @@ func (v *View) fetch(doneCh, successCh chan<- struct{}, errCh chan<- error) {
}
}

const minDelayBetweenUpdates = time.Millisecond * 100

// return a duration to sleep to limit the frequency of upstream calls
func rateLimiter(start time.Time) time.Duration {
remaining := minDelayBetweenUpdates - time.Since(start)
if remaining > 0 {
dither := time.Duration(rand.Int63n(20000000)) // 0-20ms
return remaining + dither
}
return 0
}

// stop halts polling of this view.
func (v *View) stop() {
v.dependency.Stop()
Expand Down
21 changes: 21 additions & 0 deletions watch/view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,3 +273,24 @@ func TestStop_stopsPolling(t *testing.T) {
// Successfully stopped
}
}

func TestRateLimiter(t *testing.T) {
// test for rate limiting delay working
elapsed := minDelayBetweenUpdates / 2 // simulate time passing
start := time.Now().Add(-elapsed) // add negative to subtract
dur := rateLimiter(start) // should close to elapsed
if !(dur > 0) {
t.Errorf("rate limiting duration should be > 0, found: %v", dur)
}
if dur > minDelayBetweenUpdates {
t.Errorf("rate limiting duration extected to be < %v, found %v",
minDelayBetweenUpdates, dur)
}
// test that you get 0 when enough time is past
elapsed = minDelayBetweenUpdates // simulate time passing
start = time.Now().Add(-elapsed) // add negative to subtract
dur = rateLimiter(start) // should be 0
if dur != 0 {
t.Errorf("rate limiting duration should be 0, found: %v", dur)
}
}

0 comments on commit 2a944d7

Please sign in to comment.