Skip to content

Commit

Permalink
Merge pull request #3167 from akarnokd/ReplayManageRequestsFix
Browse files Browse the repository at this point in the history
Fixed negative request due to unsubscription of a large requester
  • Loading branch information
akarnokd committed Aug 24, 2015
2 parents 7a77072 + 9d7bd8b commit e588446
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/main/java/rx/internal/operators/OperatorReplay.java
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ void manageRequests() {
InnerProducer<T>[] a = producers.get();

long ri = maxChildRequested;
long maxTotalRequests = 0;
long maxTotalRequests = ri;

for (InnerProducer<T> rp : a) {
maxTotalRequests = Math.max(maxTotalRequests, rp.totalRequested.get());
Expand Down
25 changes: 25 additions & 0 deletions src/test/java/rx/internal/operators/OperatorReplayTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1120,4 +1120,29 @@ public void onNext(Integer t) {
ts.assertNotCompleted();
ts.assertError(TestException.class);
}

@Test
public void unboundedLeavesEarly() {
PublishSubject<Integer> source = PublishSubject.create();

final List<Long> requests = new ArrayList<Long>();

Observable<Integer> out = source
.doOnRequest(new Action1<Long>() {
@Override
public void call(Long t) {
requests.add(t);
}
}).replay().autoConnect();

TestSubscriber<Integer> ts1 = TestSubscriber.create(5);
TestSubscriber<Integer> ts2 = TestSubscriber.create(10);

out.subscribe(ts1);
out.subscribe(ts2);
ts2.unsubscribe();

Assert.assertEquals(Arrays.asList(5L, 5L), requests);
}

}

0 comments on commit e588446

Please sign in to comment.