Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 15 additions & 28 deletions observer.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package rxgo

import (
"sync"

"github.com/reactivex/rxgo/options"

"github.com/reactivex/rxgo/handlers"
Expand All @@ -25,8 +23,7 @@ type Observer interface {
}

type observer struct {
disposedMutex sync.Mutex
disposed bool
disposed chan struct{}
nextHandler handlers.NextFunc
errHandler handlers.ErrFunc
doneHandler handlers.DoneFunc
Expand Down Expand Up @@ -55,7 +52,9 @@ func (o *observer) getChannel() chan interface{} {
// NewObserver constructs a new Observer instance with default Observer and accept
// any number of EventHandler
func NewObserver(eventHandlers ...handlers.EventHandler) Observer {
ob := observer{}
ob := observer{
disposed: make(chan struct{}),
}

if len(eventHandlers) > 0 {
for _, handler := range eventHandlers {
Expand Down Expand Up @@ -98,24 +97,21 @@ func (o *observer) Handle(item interface{}) {
}

func (o *observer) Dispose() {
o.disposedMutex.Lock()
o.disposed = true
o.disposedMutex.Unlock()
close(o.disposed)
}

func (o *observer) IsDisposed() bool {
o.disposedMutex.Lock()
defer o.disposedMutex.Unlock()
return o.disposed
select {
case <-o.disposed:
return true
default:
return false
}
}

// OnNext applies Observer's NextHandler to an Item
func (o *observer) OnNext(item interface{}) {
o.disposedMutex.Lock()
disposed := o.disposed
o.disposedMutex.Unlock()

if !disposed {
if !o.IsDisposed() {
switch item := item.(type) {
case error:
return
Expand All @@ -131,10 +127,7 @@ func (o *observer) OnNext(item interface{}) {

// OnError applies Observer's ErrHandler to an error
func (o *observer) OnError(err error) {
o.disposedMutex.Lock()
disposed := o.disposed
o.disposedMutex.Unlock()
if !disposed {
if !o.IsDisposed() {
if o.errHandler != nil {
o.errHandler(err)
o.Dispose()
Expand All @@ -150,10 +143,7 @@ func (o *observer) OnError(err error) {

// OnDone terminates the Observer's internal Observable
func (o *observer) OnDone() {
o.disposedMutex.Lock()
disposed := o.disposed
o.disposedMutex.Unlock()
if !disposed {
if !o.IsDisposed() {
if o.doneHandler != nil {
o.doneHandler()
o.Dispose()
Expand All @@ -169,10 +159,7 @@ func (o *observer) OnDone() {

// OnDone terminates the Observer's internal Observable
func (o *observer) Block() error {
o.disposedMutex.Lock()
disposed := o.disposed
o.disposedMutex.Unlock()
if !disposed {
if !o.IsDisposed() {
for v := range o.done {
return v
}
Expand Down
11 changes: 11 additions & 0 deletions observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,14 @@ func TestHandle(t *testing.T) {
ob.Handle(errors.New(""))
assert.Equal(t, 7, i)
}

func BenchmarkObserver_IsDisposed(b *testing.B) {
for n := 0; n < b.N; n++ {
o := NewObserver()
for i := 0; i < 10; i++ {
o.IsDisposed()
}
o.Dispose()
o.IsDisposed()
}
}
74 changes: 32 additions & 42 deletions singleobserver.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package rxgo

import (
"sync"

"github.com/reactivex/rxgo/handlers"
)

Expand All @@ -11,24 +9,24 @@ type SingleObserver interface {
handlers.EventHandler
Disposable

Block() (interface{}, error)
OnSuccess(item interface{})
OnError(err error)

Block() (interface{}, error)
}

type singleObserver struct {
disposedMutex sync.Mutex
disposed bool
nextHandler handlers.NextFunc
errHandler handlers.ErrFunc
done chan interface{}
disposed chan struct{}
nextHandler handlers.NextFunc
errHandler handlers.ErrFunc
done chan interface{}
}

// NewSingleObserver constructs a new SingleObserver instance with default SingleObserver and accept
// any number of EventHandler
func NewSingleObserver(eventHandlers ...handlers.EventHandler) SingleObserver {
ob := singleObserver{}
ob := singleObserver{
disposed: make(chan struct{}),
}

if len(eventHandlers) > 0 {
for _, handler := range eventHandlers {
Expand Down Expand Up @@ -66,23 +64,36 @@ func (o *singleObserver) Handle(item interface{}) {
}

func (o *singleObserver) Dispose() {
o.disposedMutex.Lock()
o.disposed = true
o.disposedMutex.Unlock()
close(o.disposed)
}

func (o *singleObserver) IsDisposed() bool {
o.disposedMutex.Lock()
defer o.disposedMutex.Unlock()
return o.disposed
select {
case <-o.disposed:
return true
default:
return false
}
}

// Block terminates the SingleObserver's internal Observable
func (o *singleObserver) Block() (interface{}, error) {
if !o.IsDisposed() {
for v := range o.done {
switch v := v.(type) {
case error:
return nil, v
default:
return v, nil
}
}
}
return nil, nil
}

// OnNext applies SingleObserver's NextHandler to an Item
func (o *singleObserver) OnSuccess(item interface{}) {
o.disposedMutex.Lock()
disposed := o.disposed
o.disposedMutex.Unlock()
if !disposed {
if !o.IsDisposed() {
switch item := item.(type) {
case error:
return
Expand All @@ -103,10 +114,7 @@ func (o *singleObserver) OnSuccess(item interface{}) {

// OnError applies SingleObserver's ErrHandler to an error
func (o *singleObserver) OnError(err error) {
o.disposedMutex.Lock()
disposed := o.disposed
o.disposedMutex.Unlock()
if !disposed {
if !o.IsDisposed() {
if o.errHandler != nil {
o.errHandler(err)
o.Dispose()
Expand All @@ -119,21 +127,3 @@ func (o *singleObserver) OnError(err error) {
// TODO
}
}

// Block terminates the SingleObserver's internal Observable
func (o *singleObserver) Block() (interface{}, error) {
o.disposedMutex.Lock()
disposed := o.disposed
o.disposedMutex.Unlock()
if !disposed {
for v := range o.done {
switch v := v.(type) {
case error:
return nil, v
default:
return v, nil
}
}
}
return nil, nil
}
11 changes: 11 additions & 0 deletions singleobserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,14 @@ func TestSingleObserverHandleWithError(t *testing.T) {
singleObserver.Handle(errors.New(""))
assert.Equal(t, int64(10), got)
}

func BenchmarkSingleObserver_IsDisposed(b *testing.B) {
for n := 0; n < b.N; n++ {
o := NewSingleObserver()
for i := 0; i < 10; i++ {
o.IsDisposed()
}
o.Dispose()
o.IsDisposed()
}
}