Skip to content

Conversation

@NiteshKant
Copy link
Member

UnicastContentSubject delegates buffering to a BufferUntilSubscriber subject. However, it subscribes the downstream subscriber to this subject using Observable.unsubscribe(). This causes a SafeSubscriber subscribing to the buffer subject which unsubscribes on termination of the upstream source.

The following code reproduces the issue:

        UnicastContentSubject<Long> subject = UnicastContentSubject.createWithoutNoSubscriptionTimeout();

        subject.onNext(1l);
        subject.onCompleted();

        subject.flatMap(aLong -> Observable.never()
                                           .doOnUnsubscribe(() -> System.out.println("unsubscribed")))
               .toBlocking().toFuture().get();

In the above code the flatmap returns Observable.never() which should not get unsubscribed, unless the eventual subscriber unsubscribes.

Using unsafeSubscribe() to subscribe to the BufferUntilSubscriber will not eagerly unsubscribe.

Nitesh Kant added 2 commits March 18, 2015 15:55
`UnicastContentSubject` delegates buffering to a `BufferUntilSubscriber` subject. However, it subscribes the downstream subscriber to this subject using `Observable.unsubscribe()`. This causes a `SafeSubscriber` subscribing to the buffer subject which unsubscribes on termination of the upstream source.

The following code reproduces the issue:

```java
        UnicastContentSubject<Long> subject = UnicastContentSubject.createWithoutNoSubscriptionTimeout();

        subject.onNext(1l);
        subject.onCompleted();

        subject.flatMap(aLong -> Observable.never()
                                           .doOnUnsubscribe(() -> System.out.println("unsubscribed")))
               .toBlocking().toFuture().get();
```

In the above code the flatmap returns `Observable.never()` which should not get unsubscribed, unless the eventual subscriber unsubscribes.

Using `unsafeSubscribe()` to subscribe to the `BufferUntilSubscriber` will not eagerly unsubscribe.
Conflicts:
	rxnetty/src/main/java/io/reactivex/netty/protocol/http/UnicastContentSubject.java
@NiteshKant
Copy link
Member Author

This impacts cases when the lifetime of the child Observable returned from flatmap is more than the lifecycle of the inbound request content stream.

@diptanu
Copy link
Contributor

diptanu commented Mar 18, 2015

LGTM. Get a release out for me :)

NiteshKant added a commit that referenced this pull request Mar 18, 2015
UnicastContentSubject unsubscribes eagerly
@NiteshKant NiteshKant merged commit 45d08ce into ReactiveX:0.x Mar 18, 2015
@kurzweil
Copy link

I'm able to confirm that this fixed resolved the interrupted thread issue in my app.

@NiteshKant NiteshKant added this to the 0.4.7 milestone Mar 19, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants