Skip to content

Commit

Permalink
Returned disposed channel in connect
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed May 13, 2020
1 parent bd899f3 commit ac83c7a
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 15 deletions.
22 changes: 15 additions & 7 deletions README.md
Expand Up @@ -361,25 +361,33 @@ Now, with a Connectable Observable:
```go
ch := make(chan rxgo.Item)
go func() {
ch <- rxgo.Of(1)
ch <- rxgo.Of(2)
ch <- rxgo.Of(3)
close(ch)
ch <- rxgo.Of(1)
ch <- rxgo.Of(2)
ch <- rxgo.Of(3)
close(ch)
}()
// Create a Connectable Observable
observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy())

// Create the first Observer
observable.DoOnNext(func(i interface{}) {
fmt.Printf("First observer: %d\n", i)
fmt.Printf("First observer: %d\n", i)
})

// Create the second Observer
observable.DoOnNext(func(i interface{}) {
fmt.Printf("Second observer: %d\n", i)
fmt.Printf("Second observer: %d\n", i)
})

observable.Connect()
disposed, cancel := observable.Connect()
go func() {
// Do something
time.Sleep(time.Second)
// Then cancel the subscription
cancel()
}()
// Wait for the subscription to be disposed
<-disposed
```

```
Expand Down
9 changes: 5 additions & 4 deletions factory_connectable_test.go
Expand Up @@ -51,7 +51,8 @@ func Test_Connectable_IterableChannel_Disposed(t *testing.T) {
obs := &ObservableImpl{
iterable: newChannelIterable(ch, WithPublishStrategy()),
}
obs.Connect()()
_, disposable := obs.Connect()
disposable()
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
time.Sleep(50 * time.Millisecond)
Expand Down Expand Up @@ -111,9 +112,9 @@ func Test_Connectable_IterableCreate_Disposed(t *testing.T) {
cancel()
}}, WithPublishStrategy(), WithContext(ctx)),
}
obs.Connect()()
ctx, cancel = context.WithTimeout(context.Background(), 550*time.Millisecond)
defer cancel()
obs.Connect()
_, cancel2 := context.WithTimeout(context.Background(), 550*time.Millisecond)
defer cancel2()
time.Sleep(50 * time.Millisecond)
Assert(ctx, t, obs, IsEmpty())
}
Expand Down
2 changes: 1 addition & 1 deletion observable.go
Expand Up @@ -25,7 +25,7 @@ type Observable interface {
BufferWithCount(count int, opts ...Option) Observable
BufferWithTime(timespan Duration, opts ...Option) Observable
BufferWithTimeOrCount(timespan Duration, count int, opts ...Option) Observable
Connect() Disposable
Connect() (Disposed, Disposable)
Contains(equal Predicate, opts ...Option) Single
Count(opts ...Option) Single
Debounce(timespan Duration, opts ...Option) Observable
Expand Down
9 changes: 7 additions & 2 deletions observable_operator.go
Expand Up @@ -594,10 +594,15 @@ func (o *ObservableImpl) BufferWithTimeOrCount(timespan Duration, count int, opt
}

// Connect instructs a connectable Observable to begin emitting items to its subscribers.
func (o *ObservableImpl) Connect() Disposable {
func (o *ObservableImpl) Connect() (Disposed, Disposable) {
ctx, cancel := context.WithCancel(context.Background())
o.Observe(WithContext(ctx), connect())
return Disposable(cancel)
ch := make(chan struct{})
go func() {
<-ctx.Done()
close(ch)
}()
return ch, Disposable(cancel)
}

// Contains determines whether an Observable emits a particular item or not.
Expand Down
2 changes: 1 addition & 1 deletion observable_operator_test.go
Expand Up @@ -1298,7 +1298,7 @@ func Test_Observable_Serialize_Duplicates(t *testing.T) {
Serialize(1, func(i interface{}) int {
return i.(int)
})
Assert(context.Background(), t, obs, HasItems(1, 2, 3, 4, 5))
Assert(context.Background(), t, obs, HasItems(1, 2, 3, 4, 5, 6))
}

func Test_Observable_Serialize_Loop(t *testing.T) {
Expand Down

0 comments on commit ac83c7a

Please sign in to comment.