-
Notifications
You must be signed in to change notification settings - Fork 342
Open
Labels
questionQuestion regarding how RxGo is working etc.Question regarding how RxGo is working etc.
Description
I'm trying to fun out the final step of my reactive flow to achieve parallel execution of the final step using DoOnNext.
Running the code bellow I expect that thirdCounter = 2 and every "first DoOnNext", "second DoOnNext" and "third DoOnNext" will be printer twice (total 6 times)
The printing are as expected and also the Map concat the strings correctly. However, the thirdCounter = 7 hence the steps are over invoked.
What I'm missing here?
My code:
var thirdCounter int32
func localRun(names ...string) {
observable := rxgo.Just(names)().
Map(func(_ context.Context, i interface{}) (interface{}, error) {
s := i.(string)
s = fmt.Sprintf("%s,%s", s, "one")
return s, nil
}).
Map(func(_ context.Context, i interface{}) (interface{}, error) {
s := i.(string)
s = fmt.Sprintf("%s,%s", s, "two")
return s, nil
}).
Map(func(_ context.Context, i interface{}) (interface{}, error) {
atomic.AddInt32(&thirdCounter, 1)
s := i.(string)
s = fmt.Sprintf("%s,%s", s, "three")
return s, nil
})
observable.DoOnNext(func(i interface{}) {
fmt.Println("first DoOnNext", i)
})
observable.DoOnNext(func(i interface{}) {
fmt.Println("second DoOnNext", i)
})
observable.DoOnNext(func(i interface{}) {
fmt.Println("third DoOnNext", i)
})
for item := range observable.Last().Observe() {
fmt.Println(item.V)
}
fmt.Printf("Third Counter = %d\n", thirdCounter)
}
func TestMocktFlow(t *testing.T) {
cs := make([]string, 0)
cs = append(cs, "Hello")
cs = append(cs, "Hi")
localRun(cs...)
}
Metadata
Metadata
Assignees
Labels
questionQuestion regarding how RxGo is working etc.Question regarding how RxGo is working etc.