diff --git a/observer.go b/observer.go index decce191..450db68d 100644 --- a/observer.go +++ b/observer.go @@ -1,8 +1,6 @@ package rxgo import ( - "sync" - "github.com/reactivex/rxgo/options" "github.com/reactivex/rxgo/handlers" @@ -25,8 +23,7 @@ type Observer interface { } type observer struct { - disposedMutex sync.Mutex - disposed bool + disposed chan struct{} nextHandler handlers.NextFunc errHandler handlers.ErrFunc doneHandler handlers.DoneFunc @@ -55,7 +52,9 @@ func (o *observer) getChannel() chan interface{} { // NewObserver constructs a new Observer instance with default Observer and accept // any number of EventHandler func NewObserver(eventHandlers ...handlers.EventHandler) Observer { - ob := observer{} + ob := observer{ + disposed: make(chan struct{}), + } if len(eventHandlers) > 0 { for _, handler := range eventHandlers { @@ -98,24 +97,21 @@ func (o *observer) Handle(item interface{}) { } func (o *observer) Dispose() { - o.disposedMutex.Lock() - o.disposed = true - o.disposedMutex.Unlock() + close(o.disposed) } func (o *observer) IsDisposed() bool { - o.disposedMutex.Lock() - defer o.disposedMutex.Unlock() - return o.disposed + select { + case <-o.disposed: + return true + default: + return false + } } // OnNext applies Observer's NextHandler to an Item func (o *observer) OnNext(item interface{}) { - o.disposedMutex.Lock() - disposed := o.disposed - o.disposedMutex.Unlock() - - if !disposed { + if !o.IsDisposed() { switch item := item.(type) { case error: return @@ -131,10 +127,7 @@ func (o *observer) OnNext(item interface{}) { // OnError applies Observer's ErrHandler to an error func (o *observer) OnError(err error) { - o.disposedMutex.Lock() - disposed := o.disposed - o.disposedMutex.Unlock() - if !disposed { + if !o.IsDisposed() { if o.errHandler != nil { o.errHandler(err) o.Dispose() @@ -150,10 +143,7 @@ func (o *observer) OnError(err error) { // OnDone terminates the Observer's internal Observable func (o *observer) OnDone() { - o.disposedMutex.Lock() - disposed := o.disposed - o.disposedMutex.Unlock() - if !disposed { + if !o.IsDisposed() { if o.doneHandler != nil { o.doneHandler() o.Dispose() @@ -169,10 +159,7 @@ func (o *observer) OnDone() { // OnDone terminates the Observer's internal Observable func (o *observer) Block() error { - o.disposedMutex.Lock() - disposed := o.disposed - o.disposedMutex.Unlock() - if !disposed { + if !o.IsDisposed() { for v := range o.done { return v } diff --git a/observer_test.go b/observer_test.go index f4a4282c..31d10883 100644 --- a/observer_test.go +++ b/observer_test.go @@ -56,3 +56,14 @@ func TestHandle(t *testing.T) { ob.Handle(errors.New("")) assert.Equal(t, 7, i) } + +func BenchmarkObserver_IsDisposed(b *testing.B) { + for n := 0; n < b.N; n++ { + o := NewObserver() + for i := 0; i < 10; i++ { + o.IsDisposed() + } + o.Dispose() + o.IsDisposed() + } +} diff --git a/singleobserver.go b/singleobserver.go index 9bda0020..0672ae56 100644 --- a/singleobserver.go +++ b/singleobserver.go @@ -1,8 +1,6 @@ package rxgo import ( - "sync" - "github.com/reactivex/rxgo/handlers" ) @@ -11,24 +9,24 @@ type SingleObserver interface { handlers.EventHandler Disposable + Block() (interface{}, error) OnSuccess(item interface{}) OnError(err error) - - Block() (interface{}, error) } type singleObserver struct { - disposedMutex sync.Mutex - disposed bool - nextHandler handlers.NextFunc - errHandler handlers.ErrFunc - done chan interface{} + disposed chan struct{} + nextHandler handlers.NextFunc + errHandler handlers.ErrFunc + done chan interface{} } // NewSingleObserver constructs a new SingleObserver instance with default SingleObserver and accept // any number of EventHandler func NewSingleObserver(eventHandlers ...handlers.EventHandler) SingleObserver { - ob := singleObserver{} + ob := singleObserver{ + disposed: make(chan struct{}), + } if len(eventHandlers) > 0 { for _, handler := range eventHandlers { @@ -66,23 +64,36 @@ func (o *singleObserver) Handle(item interface{}) { } func (o *singleObserver) Dispose() { - o.disposedMutex.Lock() - o.disposed = true - o.disposedMutex.Unlock() + close(o.disposed) } func (o *singleObserver) IsDisposed() bool { - o.disposedMutex.Lock() - defer o.disposedMutex.Unlock() - return o.disposed + select { + case <-o.disposed: + return true + default: + return false + } +} + +// Block terminates the SingleObserver's internal Observable +func (o *singleObserver) Block() (interface{}, error) { + if !o.IsDisposed() { + for v := range o.done { + switch v := v.(type) { + case error: + return nil, v + default: + return v, nil + } + } + } + return nil, nil } // OnNext applies SingleObserver's NextHandler to an Item func (o *singleObserver) OnSuccess(item interface{}) { - o.disposedMutex.Lock() - disposed := o.disposed - o.disposedMutex.Unlock() - if !disposed { + if !o.IsDisposed() { switch item := item.(type) { case error: return @@ -103,10 +114,7 @@ func (o *singleObserver) OnSuccess(item interface{}) { // OnError applies SingleObserver's ErrHandler to an error func (o *singleObserver) OnError(err error) { - o.disposedMutex.Lock() - disposed := o.disposed - o.disposedMutex.Unlock() - if !disposed { + if !o.IsDisposed() { if o.errHandler != nil { o.errHandler(err) o.Dispose() @@ -119,21 +127,3 @@ func (o *singleObserver) OnError(err error) { // TODO } } - -// Block terminates the SingleObserver's internal Observable -func (o *singleObserver) Block() (interface{}, error) { - o.disposedMutex.Lock() - disposed := o.disposed - o.disposedMutex.Unlock() - if !disposed { - for v := range o.done { - switch v := v.(type) { - case error: - return nil, v - default: - return v, nil - } - } - } - return nil, nil -} diff --git a/singleobserver_test.go b/singleobserver_test.go index bbcfd43c..5e1feb20 100644 --- a/singleobserver_test.go +++ b/singleobserver_test.go @@ -55,3 +55,14 @@ func TestSingleObserverHandleWithError(t *testing.T) { singleObserver.Handle(errors.New("")) assert.Equal(t, int64(10), got) } + +func BenchmarkSingleObserver_IsDisposed(b *testing.B) { + for n := 0; n < b.N; n++ { + o := NewSingleObserver() + for i := 0; i < 10; i++ { + o.IsDisposed() + } + o.Dispose() + o.IsDisposed() + } +}