Skip to content

Commit

Permalink
Simplify Limiter and update implementations
Browse files Browse the repository at this point in the history
The Limiter now only has a Limit method that accepts a channel.
Implementations now have been updated to reflect this interface and to
rename the original "Limit(int, duration)" to "SimpleLimit".
  • Loading branch information
andrewstuart committed Jul 2, 2015
1 parent 78eb782 commit f0077bb
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 62 deletions.
163 changes: 126 additions & 37 deletions README.md
Expand Up @@ -5,32 +5,14 @@
import "git.astuart.co/andrew/limio"

Package limio provides an interface abstraction for rate limiting or flow
control of arbitrary io.Readers or io.Writers. Several concrete implementations
of Limiters are also provided.
control of arbitrary io.Readers or io.Writers. A concrete implementation of a
handler is also provided for ease of use and reference are also provided.

## Usage

#### func NewReader

```go
func NewReader(r io.Reader, l Limiter) io.Reader
```
NewReader takes an io.Reader and a Limiter and returns an io.Reader that will be
limited via the strategy that Limiter choses to implement.

#### type ByteCount

```go
type ByteCount uint64
```

A ByteCount is simply an abstraction over some integer type to provide more
flexibility should the type need to be changed. Since zettabyte overflows
uint64, I suppose it may someday need to be changed.

```go
const (
B ByteCount = 1 << (10 * (iota))
B int = 1 << (10 * (iota))
KB
MB
GB
Expand All @@ -39,32 +21,139 @@ const (
EB
)
```
Some useful constants with the proper typing
Some useful byte-sized (heh) constants

```go
const DefaultWindow = 10 * time.Millisecond
```

#### func Distribute

```go
func Distribute(n int, t, w time.Duration) (int, time.Duration)
```
Distribute takes a rate (n, t) and window (w), evenly distributes the n/t to
n'/t' (n'<=n && t'>=w)

#### type Limiter

```go
type Limiter interface {
GetLimit() <-chan ByteCount
Limit(n int, t time.Duration) <-chan bool //The channel is useful for knowing that the channel has been unlimited
LimitChan(chan int) <-chan bool
Unlimit()
io.Closer
}
```


#### type Manager

```go
type Manager interface {
Limiter
Manage(Limiter)
Unmanage(Limiter)
}
```

A Limiter should implement some strategy for providing access to a shared io
resource. The GetLimit() function must return a channel of ByteCount. When it is
appropriate for the new limited io.Reader to read some amount of data, that
amount should be sent through the channel, at which point the io.Reader will
"burstily" read until it has exhausted the number of bytes it was told to read.

Caution is recommended when implementing a Limiter if this bursty behavior is
undesireable. If undesireable, make sure that any large ByteCounts are broken up
into smaller values sent at shorter intervals. See BasicReader for a good
example of how this can be achieved.
#### type Reader

```go
type Reader struct {
}
```


#### func NewReader

```go
func NewReader(r io.Reader) *Reader
```

#### func (*Reader) Close

```go
func (r *Reader) Close() error
```

#### func (*Reader) Limit

```go
func (r *Reader) Limit(n int, t time.Duration) <-chan bool
```

#### func (*Reader) LimitChan

```go
func (r *Reader) LimitChan(lch chan int) <-chan bool
```

#### func (*Reader) Read

```go
func (r *Reader) Read(p []byte) (written int, err error)
```

#### func (*Reader) Unlimit

```go
func (r *Reader) Unlimit()
```

#### type SimpleManager

```go
type SimpleManager struct {
}
```


#### func NewSimpleManager

```go
func NewSimpleManager() *SimpleManager
```

#### func (*SimpleManager) Close

```go
func (lm *SimpleManager) Close() error
```

#### func (*SimpleManager) Limit

```go
func (lm *SimpleManager) Limit(n int, t time.Duration) <-chan bool
```

#### func (*SimpleManager) LimitChan

```go
func (lm *SimpleManager) LimitChan(l chan int) <-chan bool
```

#### func (*SimpleManager) Manage

```go
func (lm *SimpleManager) Manage(l Limiter)
```

#### func (*SimpleManager) NewReader

```go
func (lm *SimpleManager) NewReader(r io.Reader) *Reader
```

#### func (*SimpleManager) Unlimit

```go
func (lm *SimpleManager) Unlimit()
```

#### func BasicLimiter
#### func (*SimpleManager) Unmanage

```go
func BasicLimiter(b ByteCount, t time.Duration) Limiter
func (lm *SimpleManager) Unmanage(l Limiter)
```
BasicLimiter will divvy up the bytes into 100 smaller parts to spread the load
across time
=======
15 changes: 8 additions & 7 deletions limit_manager.go
Expand Up @@ -90,10 +90,11 @@ func (lm *SimpleManager) run() {
}

//NOTE must ONLY be used synchonously with the run() goroutine for concurrency
//safety
//distribute takes a number and iterates over each channel in the map of managed
//Limiters, sending an evenly-distriuted limit to each "sublimiter".
//distribute takes a number to distribute and returns the number of bytes remaining
//safety.
//func distribute(int) takes a number and iterates over each channel in the map of
//managed Limiters, sending an evenly-distriuted limit to each "sublimiter".
//distribute takes a number to distribute and returns the number of bytes
//remaining
func (lm *SimpleManager) distribute(n int) int {
if len(lm.m) > 0 {
each := n / len(lm.m)
Expand All @@ -116,7 +117,7 @@ func (lm *SimpleManager) distribute(n int) int {
//newly returned bool channel so that limiters can be removed when closed.
func (lm *SimpleManager) limit(l Limiter) {
lm.m[l] = make(chan int)
done := l.LimitChan(lm.m[l])
done := l.Limit(lm.m[l])
go func() {
//If `true` passed on channel, limiter is closed
if <-done {
Expand All @@ -131,7 +132,7 @@ func (lm *SimpleManager) NewReader(r io.Reader) *Reader {
return lr
}

func (lm *SimpleManager) Limit(n int, t time.Duration) <-chan bool {
func (lm *SimpleManager) SimpleLimit(n int, t time.Duration) <-chan bool {
done := make(chan bool, 1)
lm.newLimit <- &limit{
rate: rate{n, t},
Expand All @@ -140,7 +141,7 @@ func (lm *SimpleManager) Limit(n int, t time.Duration) <-chan bool {
return done
}

func (lm *SimpleManager) LimitChan(l chan int) <-chan bool {
func (lm *SimpleManager) Limit(l chan int) <-chan bool {
done := make(chan bool, 1)
lm.newLimit <- &limit{
lim: l,
Expand Down
6 changes: 3 additions & 3 deletions limit_manager_test.go
Expand Up @@ -16,7 +16,7 @@ func TestManager(t *testing.T) {

lmr := NewSimpleManager()
ch := make(chan int, 1)
lmr.LimitChan(ch)
lmr.Limit(ch)

l1 := lmr.NewReader(strings.NewReader(testText))
l2 := lmr.NewReader(strings.NewReader(testText))
Expand Down Expand Up @@ -60,7 +60,7 @@ func TestManager(t *testing.T) {
l2.Read(p)
lmr.Unmanage(l3)

lmr.Limit(KB, 10*time.Millisecond)
lmr.SimpleLimit(KB, 10*time.Millisecond)

//Drain channel
n, err = l1.Read(p)
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestManager(t *testing.T) {
t.Errorf("Should have thrown EOF after reached EOF.")
}

done := lmr.Limit(KB, time.Second)
done := lmr.SimpleLimit(KB, time.Second)
lmr.Manage(l3)

w := &sync.WaitGroup{}
Expand Down
10 changes: 4 additions & 6 deletions limiter.go
@@ -1,13 +1,11 @@
package limio

import (
"io"
"time"
)
import "io"

//A Limiter is an interface that meters some underlying discretely quantifiable
//operation with respect to time.
type Limiter interface {
Limit(n int, t time.Duration) <-chan bool //The channel is useful for knowing that the channel has been unlimited
LimitChan(chan int) <-chan bool
Limit(chan int) <-chan bool //The channel is useful for knowing that the channel has been unlimited
Unlimit()
io.Closer
}
4 changes: 2 additions & 2 deletions reader.go
Expand Up @@ -47,7 +47,7 @@ func (r *Reader) Unlimit() {
r.newLimit <- nil
}

func (r *Reader) Limit(n int, t time.Duration) <-chan bool {
func (r *Reader) SimpleLimit(n int, t time.Duration) <-chan bool {
done := make(chan bool, 1)
r.newLimit <- &limit{
rate: rate{n, t},
Expand All @@ -56,7 +56,7 @@ func (r *Reader) Limit(n int, t time.Duration) <-chan bool {
return done
}

func (r *Reader) LimitChan(lch chan int) <-chan bool {
func (r *Reader) Limit(lch chan int) <-chan bool {
done := make(chan bool, 1)
r.newLimit <- &limit{
lim: lch,
Expand Down
14 changes: 7 additions & 7 deletions reader_test.go
Expand Up @@ -43,7 +43,7 @@ func TestLimitedReader(t *testing.T) {
c := make(chan int)

lr := NewReader(r)
lr.LimitChan(c)
lr.Limit(c)

nBytes := 20
c <- nBytes
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestEOF(t *testing.T) {
c := make(chan int)

lr := NewReader(r)
lr.LimitChan(c)
lr.Limit(c)

go func() {
c <- KB
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestNoLimit(t *testing.T) {

func TestBasicLimit(t *testing.T) {
r := NewReader(strings.NewReader(testText))
r.Limit(80, 100*time.Millisecond)
r.SimpleLimit(80, 100*time.Millisecond)

p := make([]byte, len(testText))
n, err := r.Read(p)
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestUnlimit(t *testing.T) {

ch := make(chan int, 1)
ch <- 20
r.LimitChan(ch)
r.Limit(ch)

p := make([]byte, len(testText))

Expand Down Expand Up @@ -238,7 +238,7 @@ func TestClose(t *testing.T) {
r := NewReader(strings.NewReader(testText))

ch := make(chan int, 1)
r.LimitChan(ch)
r.Limit(ch)
err := r.Close()

if err != nil {
Expand All @@ -263,7 +263,7 @@ func TestDualLimit(t *testing.T) {

ch := make(chan int, 1)
ch <- 20
done := r.LimitChan(ch)
done := r.Limit(ch)

p := make([]byte, len(testText))

Expand All @@ -279,7 +279,7 @@ func TestDualLimit(t *testing.T) {

ch = make(chan int, 1)
ch <- 30
r.LimitChan(ch)
r.Limit(ch)

if _, cls := <-done; !cls {
t.Errorf("did not close done")
Expand Down

0 comments on commit f0077bb

Please sign in to comment.