From 9d7bd8bceff3fe4c0292632de0bbb353f9243216 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 20 Aug 2015 19:19:32 +0200 Subject: [PATCH] Fixed negative request due to unsubscription of a large requester --- .../rx/internal/operators/OperatorReplay.java | 2 +- .../operators/OperatorReplayTest.java | 25 +++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/src/main/java/rx/internal/operators/OperatorReplay.java b/src/main/java/rx/internal/operators/OperatorReplay.java index e1bf7aa352..b7b52aded3 100644 --- a/src/main/java/rx/internal/operators/OperatorReplay.java +++ b/src/main/java/rx/internal/operators/OperatorReplay.java @@ -501,7 +501,7 @@ void manageRequests() { InnerProducer[] a = producers.get(); long ri = maxChildRequested; - long maxTotalRequests = 0; + long maxTotalRequests = ri; for (InnerProducer rp : a) { maxTotalRequests = Math.max(maxTotalRequests, rp.totalRequested.get()); diff --git a/src/test/java/rx/internal/operators/OperatorReplayTest.java b/src/test/java/rx/internal/operators/OperatorReplayTest.java index 046803b082..c0ec384d84 100644 --- a/src/test/java/rx/internal/operators/OperatorReplayTest.java +++ b/src/test/java/rx/internal/operators/OperatorReplayTest.java @@ -1120,4 +1120,29 @@ public void onNext(Integer t) { ts.assertNotCompleted(); ts.assertError(TestException.class); } + + @Test + public void unboundedLeavesEarly() { + PublishSubject source = PublishSubject.create(); + + final List requests = new ArrayList(); + + Observable out = source + .doOnRequest(new Action1() { + @Override + public void call(Long t) { + requests.add(t); + } + }).replay().autoConnect(); + + TestSubscriber ts1 = TestSubscriber.create(5); + TestSubscriber ts2 = TestSubscriber.create(10); + + out.subscribe(ts1); + out.subscribe(ts2); + ts2.unsubscribe(); + + Assert.assertEquals(Arrays.asList(5L, 5L), requests); + } + } \ No newline at end of file