Skip to content

Commit

Permalink
Updated eventual so that it doesn't need goroutines and therefore can…
Browse files Browse the repository at this point in the history
…'t leak goroutines
  • Loading branch information
oxtoacart committed Apr 20, 2016
1 parent 8c9ec37 commit cc4f213
Show file tree
Hide file tree
Showing 3 changed files with 259 additions and 45 deletions.
152 changes: 115 additions & 37 deletions src/github.com/getlantern/eventual/eventual.go
Expand Up @@ -2,45 +2,51 @@
package eventual

import (
"math"
"sync"
"sync/atomic"
"time"
)

const (
FALSE = 0
TRUE = 1
)

// Value is an eventual value, meaning that callers wishing to access the value
// block until the value is available.
type Value interface {
// Set sets this Value to the given val.
Set(val interface{})

// Get gets the value, blocks until timeout for a value to become available if
// one isn't immediately available.
Get(timeout time.Duration) (interface{}, bool)
// Get waits up to timeout for the value to be set and returns it, or returns
// nil if it times out or Cancel() is called. valid will be false in latter
// case. If timeout is 0, Get won't wait. If timeout is -1, Get will wait
// forever.
Get(timeout time.Duration) (ret interface{}, valid bool)

// Cancel cancels this value, signaling any waiting calls to Get() that no
// value is coming. If no value was set before Cancel() was called, all future
// calls to Get() will return nil, false. Subsequent calls to Set after Cancel
// have no effect.
Cancel()
}

// Getter is a functional interface for the Value.Get function
type Getter func(time.Duration) (interface{}, bool)

type value struct {
val atomic.Value
wg sync.WaitGroup
updates chan interface{}
gotFirst int32
state atomic.Value
waiters []chan interface{}
mutex sync.Mutex
}

type stateholder struct {
val interface{}
set bool
canceled bool
}

// NewValue creates a new Value.
func NewValue() Value {
v := &value{updates: make(chan interface{})}
// Start off by incrementing the WaitGroup by 1 to indicate that we haven't
// gotten the first value yet.
v.wg.Add(1)
go v.processUpdates()
return v
result := &value{waiters: make([]chan interface{}, 0)}
result.state.Store(&stateholder{})
return result
}

// DefaultGetter builds a Getter that always returns the supplied value.
Expand All @@ -51,36 +57,108 @@ func DefaultGetter(val interface{}) Getter {
}

func (v *value) Set(val interface{}) {
v.updates <- val
v.mutex.Lock()
defer v.mutex.Unlock()

state := v.getState()
settable := !state.canceled
if settable {
v.setState(&stateholder{
val: val,
set: true,
canceled: false,
})

if v.waiters != nil {
// Notify anyone waiting for value
for _, waiter := range v.waiters {
waiter <- val
}
// Clear waiters
v.waiters = nil
}
}
}

func (v *value) processUpdates() {
for val := range v.updates {
v.val.Store(val)
if v.gotFirst == FALSE {
// Signal to blocking callers that we have the first value
v.wg.Done()
atomic.StoreInt32(&v.gotFirst, TRUE)
func (v *value) Cancel() {
v.mutex.Lock()
defer v.mutex.Unlock()

state := v.getState()
v.setState(&stateholder{
val: state.val,
set: state.set,
canceled: true,
})

if v.waiters != nil {
// Notify anyone waiting for value
for _, waiter := range v.waiters {
close(waiter)
}
// Clear waiters
v.waiters = nil
}
}

func (v *value) Get(timeout time.Duration) (interface{}, bool) {
if atomic.LoadInt32(&v.gotFirst) == TRUE {
// Short-cut used once value has been set, to avoid extra goroutine
return v.val.Load(), true
func (v *value) Get(timeout time.Duration) (ret interface{}, valid bool) {
state := v.getState()

// First check for existing value using atomic operations (for speed)
if state.set {
// Value found, use it
return state.val, true
} else if state.canceled {
// Value was canceled, return false
return nil, false
}

if timeout == 0 {
// Don't wait
return nil, false
}

// If we didn't find an existing value, try again but this time using locking
v.mutex.Lock()
state = v.getState()

if state.set {
// Value found, use it
v.mutex.Unlock()
return state.val, true
} else if state.canceled {
// Value was canceled, return false
v.mutex.Unlock()
return nil, false
}

if timeout == -1 {
// Wait essentially forever
timeout = time.Duration(math.MaxInt64)
}

valCh := make(chan interface{})
go func() {
v.wg.Wait()
valCh <- v.val.Load()
}()
// Value not found, register to be notified once value is set
valCh := make(chan interface{}, 1)
v.waiters = append(v.waiters, valCh)
v.mutex.Unlock()

// Wait up to timeout for value to get set
select {
case val := <-valCh:
return val, true
case v, ok := <-valCh:
return v, ok
case <-time.After(timeout):
return nil, false
}
}

func (v *value) getState() *stateholder {
state := v.state.Load()
if state == nil {
return nil
}
return state.(*stateholder)
}

func (v *value) setState(state *stateholder) {
v.state.Store(state)
}
73 changes: 65 additions & 8 deletions src/github.com/getlantern/eventual/eventual_test.go
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/getlantern/grtrack"
"github.com/stretchr/testify/assert"
)

Expand All @@ -14,18 +15,68 @@ const (
)

func TestSingle(t *testing.T) {
goroutines := grtrack.Start()
v := NewValue()
go func() {
time.Sleep(20 * time.Millisecond)
v.Set("hi")
}()

r, ok := v.Get(10 * time.Millisecond)
r, ok := v.Get(0)
assert.False(t, ok, "Get with no timeout should have failed")

r, ok = v.Get(10 * time.Millisecond)
assert.False(t, ok, "Get with short timeout should have timed out")

r, ok = v.Get(20 * time.Millisecond)
assert.True(t, ok, "Get with longer timeout should have succeed")
r, ok = v.Get(-1)
assert.True(t, ok, "Get with really long timeout should have succeeded")
assert.Equal(t, "hi", r, "Wrong result")

// Set a different value
v.Set("bye")
r, ok = v.Get(0)
assert.True(t, ok, "Subsequent get with no timeout should have succeeded")
assert.Equal(t, "bye", r, "Value should have changed")

goroutines.CheckAfter(t, 50*time.Millisecond)
}

func TestNoSet(t *testing.T) {
goroutines := grtrack.Start()
v := NewValue()

_, ok := v.Get(10 * time.Millisecond)
assert.False(t, ok, "Get before setting value should not be okay")

goroutines.CheckAfter(t, 50*time.Millisecond)
}

func TestCancelImmediate(t *testing.T) {
v := NewValue()
go func() {
time.Sleep(10 * time.Millisecond)
v.Cancel()
}()

_, ok := v.Get(200 * time.Millisecond)
assert.False(t, ok, "Get after cancel should have failed")
}

func TestCancelAfterSet(t *testing.T) {
v := NewValue()
v.Set(5)
r, ok := v.Get(10 * time.Millisecond)
assert.True(t, ok, "Get before cancel should have succeeded")
assert.Equal(t, 5, r, "Get got wrong value before cancel")

v.Cancel()
r, ok = v.Get(0)
assert.True(t, ok, "Get after cancel should have succeeded")
assert.Equal(t, 5, r, "Get got wrong value after cancel")

v.Set(10)
r, ok = v.Get(0)
assert.Equal(t, 5, r, "Set after cancel should have no effect")
}

func BenchmarkGet(b *testing.B) {
Expand All @@ -41,9 +92,10 @@ func BenchmarkGet(b *testing.B) {
}

func TestConcurrent(t *testing.T) {
goroutines := grtrack.Start()
v := NewValue()

var sets int32 = 0
var sets int32

go func() {
var wg sync.WaitGroup
Expand All @@ -61,9 +113,14 @@ func TestConcurrent(t *testing.T) {
wg.Done()
}()

time.Sleep(50 * time.Millisecond)
r, ok := v.Get(20 * time.Millisecond)
assert.True(t, ok, "Get should have succeed")
assert.Equal(t, "hi", r, "Wrong result")
for i := 0; i < concurrency; i++ {
go func() {
r, ok := v.Get(200 * time.Millisecond)
assert.True(t, ok, "Get should have succeed")
assert.Equal(t, "hi", r, "Wrong result")
}()
}

goroutines.CheckAfter(t, 50*time.Millisecond)
assert.EqualValues(t, concurrency, atomic.LoadInt32(&sets), "Wrong number of successful Sets")
}
79 changes: 79 additions & 0 deletions src/github.com/getlantern/grtrack/grtrack.go
@@ -0,0 +1,79 @@
// Package grtrack provides a utility that helps check for goroutine leaks.
package grtrack

import (
"bytes"
"regexp"
"runtime/pprof"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

var (
goroutineNumber = regexp.MustCompile(`goroutine ([0-9]+)`)
)

// Object that can be used to check whether goroutines have leaked at any point
// in time.
type Checker interface {
// Check immediately checks whether there's been a leak
Check(t *testing.T)

// CheckAfter waits wait and then checks
CheckAfter(t *testing.T, wait time.Duration)
}

type checker struct {
check func(t *testing.T)
}

func Start() Checker {
var buf bytes.Buffer
_ = pprof.Lookup("goroutine").WriteTo(&buf, 2)
before := buf.String()

check := func(t *testing.T) {
var buf bytes.Buffer
_ = pprof.Lookup("goroutine").WriteTo(&buf, 2)
after := buf.String()

beforeGoroutines := make(map[string]bool)
beforeMatches := goroutineNumber.FindAllStringSubmatch(before, -1)
for _, match := range beforeMatches {
beforeGoroutines[match[1]] = true
}

afterMatches := goroutineNumber.FindAllStringSubmatchIndex(after, -1)
for i := 0; i < len(afterMatches); i++ {
idx := afterMatches[i][0]
nextIdx := len(after)
last := i == len(afterMatches)-1
if !last {
nextIdx = afterMatches[i+1][0]
}
matches := goroutineNumber.FindAllStringSubmatch(after[idx:], 1)
num := matches[0][1]
_, exists := beforeGoroutines[num]
if !exists {
delta := after[idx:nextIdx]
if !strings.Contains(delta, "net/http/server.go") {
assert.Fail(t, "Leaked Goroutine", delta)
}
}
}
}

return &checker{check}
}

func (c *checker) Check(t *testing.T) {
c.check(t)
}

func (c *checker) CheckAfter(t *testing.T, wait time.Duration) {
time.Sleep(wait)
c.check(t)
}

0 comments on commit cc4f213

Please sign in to comment.