Skip to content

Commit

Permalink
Hot observable initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Dec 11, 2018
1 parent 6d7ab5d commit 57ebee4
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 97 deletions.
94 changes: 45 additions & 49 deletions connectableobservable.go
@@ -1,10 +1,9 @@
package rxgo

import (
"sync"

"github.com/reactivex/rxgo/handlers"
"github.com/reactivex/rxgo/options"
"sync"
)

type ConnectableObservable interface {
Expand All @@ -14,12 +13,13 @@ type ConnectableObservable interface {
}

type connectableObservable struct {
iterator Iterator
observable Observable
observers []Observer
iterator Iterator
observable Observable
observersMutex sync.Mutex
observers []Observer
}

func newConnectableObservableFromObservable(observable Observable) ConnectableObservable {
func newConnectableObservable(observable Observable) ConnectableObservable {
return &connectableObservable{
observable: observable,
iterator: observable.Iterator(),
Expand All @@ -31,57 +31,53 @@ func (c *connectableObservable) Iterator() Iterator {
}

func (c *connectableObservable) Subscribe(handler handlers.EventHandler, opts ...options.Option) Observer {
observableOptions := options.ParseOptions(opts...)

ob := CheckEventHandler(handler)
ob.setBackpressureStrategy(observableOptions.BackpressureStrategy())
var ch chan interface{}
if observableOptions.BackpressureStrategy() == options.Buffer {
ch = make(chan interface{}, observableOptions.Buffer())
} else {
ch = make(chan interface{})
}
ob.setChannel(ch)
c.observersMutex.Lock()
c.observers = append(c.observers, ob)
c.observersMutex.Unlock()

go func() {
for item := range ch {
switch item := item.(type) {
case error:
ob.OnError(item)
return
default:
ob.OnNext(item)
}
}
}()

return ob
}

func (c *connectableObservable) Connect() Observer {
source := make([]interface{}, 0)

it := c.iterator
for it.Next() {
item := it.Value()
source = append(source, item)
}

var wg sync.WaitGroup

for _, ob := range c.observers {
wg.Add(1)
local := make([]interface{}, len(source))
copy(local, source)

go func(ob Observer) {
defer wg.Done()
var e error
OuterLoop:
for _, item := range local {
switch item := item.(type) {
case error:
ob.OnError(item)

// Record error
e = item
break OuterLoop
out := NewObserver()
go func() {
it := c.iterator
for it.Next() {
item := it.Value()
c.observersMutex.Lock()
for _, observer := range c.observers {
c.observersMutex.Unlock()
select {
case observer.getChannel() <- item:
default:
ob.OnNext(item)
}
c.observersMutex.Lock()
}

if e == nil {
ob.OnDone()
} else {
ob.OnError(e)
}
}(ob)
}

ob := NewObserver()
go func() {
wg.Wait()
ob.OnDone()
c.observersMutex.Unlock()
}
}()

return ob
return out
}
45 changes: 12 additions & 33 deletions connectableobservable_test.go
@@ -1,45 +1,24 @@
package rxgo

import (
"errors"
"testing"

"fmt"
"github.com/reactivex/rxgo/handlers"
"github.com/stretchr/testify/assert"
"github.com/reactivex/rxgo/options"
"testing"
"time"
)

func TestConnect(t *testing.T) {
just := Just(1, 2, 3).Publish()
obs := Just(1, 2, 3).Publish()
got1 := make([]interface{}, 0)
got2 := make([]interface{}, 0)

just.Subscribe(handlers.NextFunc(func(i interface{}) {
obs.Subscribe(handlers.NextFunc(func(i interface{}) {
got1 = append(got1, i)
}))

just.Subscribe(handlers.NextFunc(func(i interface{}) {
got2 = append(got2, i)
}))

just.Connect().Block()
assert.Equal(t, []interface{}{1, 2, 3}, got1)
assert.Equal(t, []interface{}{1, 2, 3}, got2)
}

func TestConnectOnError(t *testing.T) {
just := Just(1, 2, 3, errors.New("foo"), 4).Publish()
got1 := make([]interface{}, 0)
got2 := make([]interface{}, 0)

just.Subscribe(handlers.NextFunc(func(i interface{}) {
got1 = append(got1, i)
}))

just.Subscribe(handlers.NextFunc(func(i interface{}) {
got2 = append(got2, i)
}))
time.Sleep(200 * time.Millisecond) // Pause the observer on purpose
}), options.WithBufferBackpressureStrategy(1))
//}), options.WithDropBackpressureStrategy())
obs.Connect()

just.Connect().Block()
assert.Equal(t, []interface{}{1, 2, 3}, got1)
assert.Equal(t, []interface{}{1, 2, 3}, got2)
time.Sleep(500 * time.Millisecond) // Ugly wait just for the example
fmt.Printf("%v\n", got1)
}
2 changes: 1 addition & 1 deletion observable.go
Expand Up @@ -566,7 +566,7 @@ func (o *observable) ForEach(nextFunc handlers.NextFunc, errFunc handlers.ErrFun
// Publish returns a ConnectableObservable which waits until its connect method
// is called before it begins emitting items to those Observers that have subscribed to it.
func (o *observable) Publish() ConnectableObservable {
return newConnectableObservableFromObservable(o)
return newConnectableObservable(o)
}

func (o *observable) All(predicate Predicate) Single {
Expand Down
36 changes: 30 additions & 6 deletions observer.go
@@ -1,6 +1,7 @@
package rxgo

import (
"github.com/reactivex/rxgo/options"
"sync"

"github.com/reactivex/rxgo/handlers"
Expand All @@ -16,15 +17,38 @@ type Observer interface {
OnDone()

Block() error
setChannel(chan interface{})
getChannel() chan interface{}
setBackpressureStrategy(options.BackpressureStrategy)
getBackpressureStrategy() options.BackpressureStrategy
}

type observer struct {
disposedMutex sync.Mutex
disposed bool
nextHandler handlers.NextFunc
errHandler handlers.ErrFunc
doneHandler handlers.DoneFunc
done chan error
disposedMutex sync.Mutex
disposed bool
nextHandler handlers.NextFunc
errHandler handlers.ErrFunc
doneHandler handlers.DoneFunc
done chan error
channel chan interface{}
backpressureStrategy options.BackpressureStrategy
buffer int
}

func (o *observer) setBackpressureStrategy(strategy options.BackpressureStrategy) {
o.backpressureStrategy = strategy
}

func (o *observer) getBackpressureStrategy() options.BackpressureStrategy {
return o.backpressureStrategy
}

func (o *observer) setChannel(ch chan interface{}) {
o.channel = ch
}

func (o *observer) getChannel() chan interface{} {
return o.channel
}

// NewObserver constructs a new Observer instance with default Observer and accept
Expand Down
40 changes: 33 additions & 7 deletions options/options.go
@@ -1,26 +1,39 @@
package options

type BackpressureStrategy uint32

const (
Drop BackpressureStrategy = iota
Buffer
)

// Option is the configuration of an observable
type Option interface {
apply(*funcOption)
Parallelism() int
BufferedChannelCapacity() int
Buffer() int
BackpressureStrategy() BackpressureStrategy
}

// funcOption wraps a function that modifies options into an
// implementation of the Option interface.
type funcOption struct {
f func(*funcOption)
parallelism int
bufferedChannelCapacity int
f func(*funcOption)
parallelism int
buffer int
bpStrategy BackpressureStrategy
}

func (fdo *funcOption) Parallelism() int {
return fdo.parallelism
}

func (fdo *funcOption) BufferedChannelCapacity() int {
return fdo.bufferedChannelCapacity
func (fdo *funcOption) Buffer() int {
return fdo.buffer
}

func (fdo *funcOption) BackpressureStrategy() BackpressureStrategy {
return fdo.bpStrategy
}

func (fdo *funcOption) apply(do *funcOption) {
Expand Down Expand Up @@ -53,6 +66,19 @@ func WithParallelism(parallelism int) Option {
// WithBufferedChannel allows to configure the capacity of a buffered channel
func WithBufferedChannel(capacity int) Option {
return newFuncOption(func(options *funcOption) {
options.bufferedChannelCapacity = capacity
options.buffer = capacity
})
}

func WithDropBackpressureStrategy() Option {
return newFuncOption(func(options *funcOption) {
options.bpStrategy = Drop
})
}

func WithBufferBackpressureStrategy(buffer int) Option {
return newFuncOption(func(options *funcOption) {
options.bpStrategy = Buffer
options.buffer = buffer
})
}
2 changes: 1 addition & 1 deletion options/options_test.go
Expand Up @@ -10,5 +10,5 @@ func TestOptions(t *testing.T) {
option := ParseOptions(WithParallelism(1), WithBufferedChannel(2))

assert.Equal(t, option.Parallelism(), 1)
assert.Equal(t, option.BufferedChannelCapacity(), 2)
assert.Equal(t, option.Buffer(), 2)
}
36 changes: 36 additions & 0 deletions single_test.go
@@ -1,7 +1,9 @@
package rxgo

import (
"fmt"
"testing"
"time"

"github.com/reactivex/rxgo/handlers"
"github.com/reactivex/rxgo/optional"
Expand Down Expand Up @@ -78,3 +80,37 @@ func TestSingleMapWithTwoSubscription(t *testing.T) {
AssertThatSingle(t, just, HasValue(3))
AssertThatSingle(t, just, HasValue(3))
}

type Context interface {
isBlue() bool
}

func TestX(t *testing.T) {
ch := make(chan interface{})
go func() {
for {
v := <-ch
time.Sleep(500 * time.Millisecond)
fmt.Printf("%v\n", v)
}
}()

time.Sleep(1 * time.Second)
msg := "hi"
select {
case ch <- msg:
fmt.Println("sent message", msg)
default:
fmt.Println("no message sent")
}
time.Sleep(1 * time.Second)

select {
case ch <- msg:
fmt.Println("sent message", msg)
default:
fmt.Println("no message sent")
}

time.Sleep(1 * time.Second)
}

0 comments on commit 57ebee4

Please sign in to comment.