Skip to content

Commit

Permalink
feat: add counting semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
brianmcgee committed Sep 7, 2022
1 parent 9294fde commit dd7187a
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 4 deletions.
48 changes: 44 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@ http://localhost:6060/pkg/github.com/41north/go-async
$ go get -u github.com/41north/go-async
```

## Quick Start

### Future

Add this import line to the file you're working in:

```Go
import "github.com/41north/go-async"
```

## Quick Start

### Future

A basic example:

```Go
Expand All @@ -53,6 +53,46 @@ go func() {
f.Set("hello")
```

### Counting Semaphore

A basic example:

```go
// we create an input and output channel for work needing to be done
inCh := make(chan string, 128)
outCh := make(chan int, 128)

// we want a max of 10 in-flight processes
s := NewCountingSemaphore(10)

// we create more workers than tokens available
for i := 0; i < 100; i++ {
go func() {
for {
// acquire a token, waiting until one is available
s.Acquire(1)

// consume from the input channel
v, ok := <-inCh
if !ok {
// channel was closed
return
}

// do some work and produce an output value
outCh <- len(v)

// you need to be careful about releasing, if possible perform it with defer
s.Release(1)
}
}()
}

// generate some work and put it into the work queue
// ...
// ...
```

There are more examples available in the go doc.

## License
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ require github.com/stretchr/testify v1.8.0
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/tidwall/btree v1.4.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/tidwall/btree v1.4.2 h1:PpkaieETJMUxYNADsjgtNRcERX7mGc/GP2zp/r5FM3g=
github.com/tidwall/btree v1.4.2/go.mod h1:LGm8L/DZjPLmeWGjv5kFrY8dL4uVhMmzmmLYmsObdKE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
52 changes: 52 additions & 0 deletions semaphore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package async

import (
"runtime"
"sync/atomic"
)

// NewCountingSemaphore creates a new semaphore with specified amount of available tokens.
func NewCountingSemaphore(size int32) CountingSemaphore {
result := countingSemaphore{size: size}
result.tokens.Store(size)
return &result
}

type countingSemaphore struct {
size int32
tokens atomic.Int32
}

func (l *countingSemaphore) Size() int32 {
return l.size
}

func (l *countingSemaphore) Acquire(count int32) {
for !l.TryAcquire(count) {
runtime.Gosched()
}
}

func (l *countingSemaphore) TryAcquire(count int32) bool {
if l.tokens.Add(count*-1) < 0 {
// acquire failed, cancel the attempt by adding back what was removed
l.tokens.Add(count)
return false
}
return true
}

func (l *countingSemaphore) Release(count int32) {
for !l.TryRelease(count) {
runtime.Gosched()
}
}

func (l *countingSemaphore) TryRelease(count int32) bool {
current := l.tokens.Load()
update := current + count
if update > l.size {
return false
}
return l.tokens.CompareAndSwap(current, update)
}
127 changes: 127 additions & 0 deletions semaphore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package async

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/tidwall/btree"
)

func ExampleCountingSemaphore() {
// we create an input and output channel for work needing to be done
inCh := make(chan string, 128)
outCh := make(chan int, 128)

// we want a max of 10 in-flight processes
s := NewCountingSemaphore(10)

// we create more workers than tokens available
for i := 0; i < 100; i++ {
go func() {
for {
// acquire a token, waiting until one is available
s.Acquire(1)

// consume from the input channel
v, ok := <-inCh
if !ok {
// channel was closed
return
}

// do some work and produce an output value
outCh <- len(v)

// you need to be careful about releasing, if possible perform it with defer
s.Release(1)
}
}()
}

// generate some work and put it into the work queue
// ...
// ...
}

func TestCountingSemaphore_TryAcquireAndRelease(t *testing.T) {
s := NewCountingSemaphore(2)

assert.Equal(t, int32(2), s.Size())

// tokens are available
assert.True(t, s.TryAcquire(1))
assert.True(t, s.TryAcquire(1))

// tokens have been exhausted
assert.False(t, s.TryAcquire(1))
assert.False(t, s.TryAcquire(10))

// release
assert.True(t, s.TryRelease(1))

// acquire again
assert.True(t, s.TryAcquire(1))

// exhausted again
assert.False(t, s.TryAcquire(1))

// try to release more than the size
assert.False(t, s.TryRelease(100))
}

func TestCountingSemaphore_Pipeline(t *testing.T) {
workCount := 1000000
workerCount := 100

semaphore := NewCountingSemaphore(32)

inCh := make(chan int, 1024)
outCh := make(chan int, 1024)

wg := sync.WaitGroup{}
wg.Add(1)

// process the output and verify we see all the work items produced
go func() {
results := btree.Set[int]{}
for {
v := <-outCh
results.Insert(v)
if results.Len() == workCount {
break
}
}

for i := 0; i < results.Len(); i++ {
v, ok := results.GetAt(i)
assert.True(t, ok)
assert.Equal(t, i, v)
}

wg.Done()
}()

// generate workers and start processing
for i := 0; i < workerCount; i++ {
go func() {
for {
semaphore.Acquire(1)
v, ok := <-inCh
if !ok {
return // channel was closed
}
outCh <- v
semaphore.Release(1)
}
}()
}

for i := 0; i < workCount; i++ {
// generate work
inCh <- i
}
close(inCh)

wg.Wait()
}
20 changes: 20 additions & 0 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,23 @@ type Result[T any] interface {
// Unwrap deconstructs the contents of this Result into a tuple.
Unwrap() (T, error)
}

// CountingSemaphore can be used to limit the amount of in-flight processes / tasks.
type CountingSemaphore interface {
// Size returns the total number of tokens available withing this CountingSemaphore.
Size() int32

// Acquire attempts to acquire an amount of tokens from the semaphore, waiting until it is successful.
Acquire(count int32)

// TryAcquire attempts to acquire an amount of tokens from the semaphore and returns whether
// it was successful or not.
TryAcquire(count int32) bool

// Release attempts to return a certain amount of tokens to the semaphore, waiting until it is successful.
Release(count int32)

// TryRelease attempts to return a certain amount of tokens to the semaphore and returns whether
// it was successful or not.
TryRelease(count int32) bool
}

0 comments on commit dd7187a

Please sign in to comment.