Skip to content

Commit

Permalink
Add rate limit
Browse files Browse the repository at this point in the history
  • Loading branch information
icambridge committed Feb 11, 2017
1 parent ea82867 commit a9b0a02
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 25 deletions.
13 changes: 8 additions & 5 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ type Pool interface {
}

type NonTimeLimitedPool struct {
input chan Task
output chan interface{}
wg *sync.WaitGroup
input chan Task
output chan interface{}
wg *sync.WaitGroup
}

func (p NonTimeLimitedPool) End() {
Expand Down Expand Up @@ -73,12 +73,15 @@ func (p *NonTimeLimitedPool) worker() {
}
}

func (p *NonTimeLimitedPool) AddWorker() {
func (p NonTimeLimitedPool) AddWorker() {
p.wg.Add(1)
go p.worker()
}

func NewPool(options PoolOptions) Pool {
if options.Duration != 0 && options.PerDuration != 0 {
return NewTimeLimitedPool(options)
}
return NewNonTimeLimitedPool(options)
}

Expand All @@ -94,4 +97,4 @@ func NewNonTimeLimitedPool(options PoolOptions) NonTimeLimitedPool {
p.AddWorker()
}
return p
}
}
11 changes: 1 addition & 10 deletions pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package cartel
import (
"runtime"
"testing"
"time"
)

func Test_Create_Starts_Correct_Number_Of_Goroutines(t *testing.T) {
Expand Down Expand Up @@ -50,7 +49,7 @@ func Test_Returns_Output(t *testing.T) {

values := p.GetOutput()

if expected, actual := 1, len(values);expected != actual {
if expected, actual := 1, len(values); expected != actual {
t.Errorf("expected %v but got %v ", expected, actual)
}
value := values[0]
Expand Down Expand Up @@ -88,11 +87,3 @@ type TestTask struct {
func (tt TestTask) Execute() interface{} {
return tt.Name
}

type TestTimeTask struct {
Name string
}

func (ttt TestTimeTask) Execute() interface{} {
return time.Now()
}
41 changes: 31 additions & 10 deletions time_limited_pool.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
package cartel

import (
"runtime"
"sync"
"time"
)

type TimeLimitedPool struct {
input chan Task
output chan interface{}
wg *sync.WaitGroup
available map[int]bool
options PoolOptions
input chan Task
output chan interface{}
wg *sync.WaitGroup
options PoolOptions
lastChecked time.Time
allowace float64
}


func (p TimeLimitedPool) End() {
close(p.input)
p.wg.Wait()
Expand Down Expand Up @@ -41,32 +42,52 @@ func (p TimeLimitedPool) GetOutput() []interface{} {

func (p *TimeLimitedPool) worker() {
for {
p.rateLimit()
t, ok := <-p.input

if !ok {
p.wg.Done()
break
}

v := t.Execute()
p.lastChecked = time.Now()

p.output <- v
runtime.GC()
}
}

func (p *TimeLimitedPool) AddWorker() {
func (p *TimeLimitedPool) rateLimit() {
lastChecked := p.lastChecked
now := time.Now()

p.lastChecked = now
diff := float64(now.Unix() - lastChecked.Unix())
floatRate := float64(p.options.PerDuration)
floatDuration := float64(p.options.Duration.Seconds())

p.allowace = p.allowace + (diff * (floatRate / floatDuration))
if p.allowace < 1.0 {
time.Sleep(p.options.Duration)
}
}

func (p TimeLimitedPool) AddWorker() {
p.wg.Add(1)
go p.worker()
}

func NewTimeLimitedPool(options PoolOptions) TimeLimitedPool {

jobs := make(chan Task, 100)
results := make(chan interface{}, 100)
available := map[int]bool{}

var wg sync.WaitGroup
p := TimeLimitedPool{jobs, results, &wg, available, options}
p := TimeLimitedPool{jobs, results, &wg, options, time.Now(), 0.0}

for w := 1; w <= options.Size; w++ {
p.AddWorker()
}
return p
}
}
40 changes: 40 additions & 0 deletions time_limited_pool_test.go
Original file line number Diff line number Diff line change
@@ -1 +1,41 @@
package cartel

import (
"testing"
"time"
)

func Test_Runs_One_Per_Second(t *testing.T) {

p := NewPool(
PoolOptions{
Size: 1,
PerDuration: 1,
Duration: time.Second,
},
)
p.Do(TestTimeTask{})
p.Do(TestTimeTask{})
p.End()

values := p.GetOutput()

if actual := len(values); actual != 2 {
t.Errorf("expected 2 items but got %v", actual)
}

firstTime := values[0].(time.Time)
secondTime := values[1].(time.Time)

d := secondTime.Sub(firstTime)
if d.Seconds() < 1 {
t.Errorf("expected 1 seconds but got %v - %v %v", d.Seconds(), firstTime, secondTime)
}
}

type TestTimeTask struct {
}

func (ttt TestTimeTask) Execute() interface{} {
return time.Now()
}

0 comments on commit a9b0a02

Please sign in to comment.