Skip to content

Commit

Permalink
Migration of the operators in a dedicated file.
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Dec 12, 2018
1 parent 47f0742 commit 217aec0
Show file tree
Hide file tree
Showing 4 changed files with 1,235 additions and 1,104 deletions.
38 changes: 15 additions & 23 deletions connectableobservable_test.go
@@ -1,13 +1,5 @@
package rxgo

import (
"fmt"
"github.com/reactivex/rxgo/handlers"
"github.com/reactivex/rxgo/options"
"testing"
"time"
)

//func TestConnect(t *testing.T) {
// obs := Just(1, 2, 3).Publish()
// got1 := make([]interface{}, 0)
Expand Down Expand Up @@ -35,18 +27,18 @@ import (
// fmt.Printf("%v\n", got1)
//}

func TestConnect2(t *testing.T) {
obs := Just(1, 2, 3).Map(func(i interface{}) interface{} {
return 1 + i.(int)
}).Publish()
got1 := make([]interface{}, 0)

obs.Subscribe(handlers.NextFunc(func(i interface{}) {
got1 = append(got1, i)
time.Sleep(200 * time.Millisecond) // Pause the observer on purpose
}), options.WithBufferBackpressureStrategy(1))
obs.Connect()

time.Sleep(500 * time.Millisecond) // Ugly wait just for the example
fmt.Printf("%v\n", got1)
}
//func TestConnect2(t *testing.T) {
// obs := Just(1, 2, 3).Map(func(i interface{}) interface{} {
// return 1 + i.(int)
// }).Publish()
// got1 := make([]interface{}, 0)
//
// obs.Subscribe(handlers.NextFunc(func(i interface{}) {
// got1 = append(got1, i)
// time.Sleep(200 * time.Millisecond) // Pause the observer on purpose
// }), options.WithBufferBackpressureStrategy(1))
// obs.Connect()
//
// time.Sleep(500 * time.Millisecond) // Ugly wait just for the example
// fmt.Printf("%v\n", got1)
//}
3 changes: 3 additions & 0 deletions fx.go
Expand Up @@ -41,4 +41,7 @@ type (

// Supplier defines a function that supplies a result from nothing.
Supplier func() interface{}

// ChanProducer defines a function that produces results into a channel.
ChanProducer func(chan interface{})
)

0 comments on commit 217aec0

Please sign in to comment.