Skip to content

CombineLatest on observables created with Defer #327

@philippseith

Description

@philippseith

Describe the bug
When creating an Observable with Defer, every call to Observe restarts the producer and every observer gets the same values.
But this is not the case when the Observable is transformed with operators that itself call Observe on that Observable, like CombineLatest. In contrast to that, the Map operator is respecting the Defer origin of the Observable.

To Reproduce
Steps to reproduce the behavior:

func TestCombineLatest(t *testing.T) {
	o := rxgo.Defer([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
		next <- rxgo.Of(1)
		next <- rxgo.Of(2)
		next <- rxgo.Of(3)
	}})
	co := rxgo.CombineLatest(func(i ...interface{}) interface{} {
		return i[0]
	}, []rxgo.Observable{o})
	ch1 := co.Observe()
	ch2 := co.Observe()
	rr1 := make([]interface{}, 0)
	rr2 := make([]interface{}, 0)
	done1 := make(chan struct{}, 1)
	go func() {
		for i := 0; i < 3; i++ {
			r1 := <-ch1
			rr1 = append(rr1, r1.V)
		}
		close(done1)
	}()
	done2 := make(chan struct{}, 1)
	go func() {
		for i := 0; i < 3; i++ {
			r2 := <-ch2
			rr2 = append(rr2, r2.V)
		}
		close(done2)
	}()
	<-done1
	<-done2
	Expect(rr1).To(Equal(rr2))
}

The Producer is called only once. Depending on the scheduling 1, 2, 3 is distributed on rr1 and rr2, when no more values are available, rr1, rr2 get nils from the closed channels.

Expected behavior
Both observers should get the 1, 2, 3.

Additional context
The same pattern is working with System.Reactive.Linq in dotnet.

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions