Skip to content

Commit

Permalink
Add SetTimeout to Reader function.
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewstuart committed Jul 2, 2015
1 parent 2411adb commit 20b06b9
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 27 deletions.
12 changes: 9 additions & 3 deletions limit_manager.go
@@ -1,6 +1,7 @@
package limio

import (
"errors"
"io"
"time"
)
Expand All @@ -11,7 +12,7 @@ import (
//manage their own distribution of the bandwidth they are allocated.
type Manager interface {
Limiter
Manage(Limiter)
Manage(Limiter) error
Unmanage(Limiter)
}

Expand Down Expand Up @@ -187,7 +188,12 @@ func (lm *SimpleManager) Unmanage(l Limiter) {
}

//Manage takes a Limiter that will be adopted under the management policy of
//the SimpleManager
func (lm *SimpleManager) Manage(l Limiter) {
//the SimpleManager.
func (lm *SimpleManager) Manage(l Limiter) error {
if l == lm {
return errors.New("A manager cannot manage itself.")
}

lm.newLimiter <- l
return nil
}
7 changes: 6 additions & 1 deletion limit_manager_test.go
Expand Up @@ -11,10 +11,15 @@ import (
_ "net/http/pprof"
)

func verifyIsManager(m Manager) {}

func TestManager(t *testing.T) {
go http.ListenAndServe(":6060", nil)

lmr := NewSimpleManager()

verifyIsManager(lmr)

ch := make(chan int, 1)
lmr.Limit(ch)

Expand Down Expand Up @@ -109,7 +114,7 @@ func TestManager(t *testing.T) {

go func() {
if cls := <-done; !cls {
t.Errorf("Did not close \"done\" and pass true. Done: %t, Ok: %t", cls)
t.Errorf("Did not close \"done\" and pass true. Done: %t", cls)
}
w.Done()
}()
Expand Down
85 changes: 62 additions & 23 deletions reader.go
@@ -1,6 +1,7 @@
package limio

import (
"errors"
"io"
"sync"
"time"
Expand All @@ -16,6 +17,9 @@ type Reader struct {
limitedM *sync.RWMutex
limited bool

timeoutM *sync.Mutex
timeout time.Duration

rate chan int
used chan int
newLimit chan *limit
Expand All @@ -38,6 +42,7 @@ func NewReader(r io.Reader) *Reader {
lr := Reader{
r: r,
limitedM: &sync.RWMutex{},
timeoutM: &sync.Mutex{},
newLimit: make(chan *limit),
rate: make(chan int, 10),
used: make(chan int),
Expand Down Expand Up @@ -82,6 +87,9 @@ func (r *Reader) Close() error {
return nil
}

//ErrTimeoutExceeded will be returned upon a timeout lapsing without a read occuring
var ErrTimeoutExceeded error = errors.New("Timeout Exceeded")

//Read implements io.Reader in a blocking manner according to the limits of the
//limio.Reader.
func (r *Reader) Read(p []byte) (written int, err error) {
Expand All @@ -95,18 +103,39 @@ func (r *Reader) Read(p []byte) (written int, err error) {
for written < len(p) && err == nil {

r.limitedM.RLock()
if r.limited {
r.limitedM.RUnlock()
select {
case lim = <-r.rate:
default:
if written > 0 {
isLimited := r.limited
r.limitedM.RUnlock()

if isLimited {

r.timeoutM.Lock()
timeLimit := r.timeout
r.timeoutM.Unlock()

//TODO consolidate two cases if possible. Dynamic select via reflection?
if timeLimit > 0 {
select {
case <-time.After(timeLimit):
err = ErrTimeoutExceeded
return
case lim = <-r.rate:
default:
if written > 0 {
return
}
lim = <-r.rate
}
} else {
select {
case lim = <-r.rate:
default:
if written > 0 {
return
}
lim = <-r.rate
}
lim = <-r.rate
}
} else {
r.limitedM.RUnlock()
lim = len(p[written:])
}

Expand Down Expand Up @@ -134,11 +163,21 @@ func (r *Reader) send(i int) {
}
}

//SetTimeout takes some time.Duration t and configures the underlying Reader to
//return a limio.TimedOut error if the timeout is exceeded while waiting for a
//read operation.
func (r *Reader) SetTimeout(t time.Duration) error {
r.timeoutM.Lock()
r.timeout = t
r.timeoutM.Unlock()
return nil
}

func (r *Reader) run() {
er := rate{}
cl := &limit{}
emptyRate := rate{}
currLim := &limit{}

currTicker := &time.Ticker{}
rateTicker := &time.Ticker{}

//This loop is important for serializing access to the limits and the
//io.Reader being managed
Expand All @@ -149,32 +188,32 @@ func (r *Reader) run() {
r.limited = false
r.limitedM.Unlock()

currTicker.Stop()
go notify(cl.done, true)
rateTicker.Stop()
go notify(currLim.done, true)

close(r.newLimit)
close(r.used)
close(r.rate)

return
case l := <-cl.lim:
case l := <-currLim.lim:
r.send(l)
case <-currTicker.C:
r.send(cl.rate.n)
case <-rateTicker.C:
r.send(currLim.rate.n)
case l := <-r.newLimit:
go notify(cl.done, false)
currTicker.Stop()
cl = &limit{}
go notify(currLim.done, false)
rateTicker.Stop()
currLim = &limit{}

if l != nil {
cl = l
currLim = l
r.limitedM.Lock()
r.limited = true
r.limitedM.Unlock()

if cl.rate != er && cl.rate.n != 0 {
cl.rate.n, cl.rate.t = Distribute(cl.rate.n, cl.rate.t, DefaultWindow)
currTicker = time.NewTicker(cl.rate.t)
if currLim.rate != emptyRate && currLim.rate.n != 0 {
currLim.rate.n, currLim.rate.t = Distribute(currLim.rate.n, currLim.rate.t, DefaultWindow)
rateTicker = time.NewTicker(currLim.rate.t)
}
} else {
r.limitedM.Lock()
Expand Down

0 comments on commit 20b06b9

Please sign in to comment.