From 9d7159e3f86d161c205e5d3e6c8c646f695e1b3a Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Fri, 5 Dec 2014 21:46:56 +1100 Subject: [PATCH] add check for unsubscribed before next read to StringObservable.from(Reader) and unit test --- .../java/rx/observables/StringObservable.java | 3 ++- .../rx/observables/StringObservableTest.java | 18 ++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/src/main/java/rx/observables/StringObservable.java b/src/main/java/rx/observables/StringObservable.java index a87806a..5a9a9b2 100644 --- a/src/main/java/rx/observables/StringObservable.java +++ b/src/main/java/rx/observables/StringObservable.java @@ -176,7 +176,8 @@ public void call(Subscriber o) { n = i.read(buffer); while (n != -1 && !o.isUnsubscribed()) { o.onNext(new String(buffer, 0, n)); - n = i.read(buffer); + if (!o.isUnsubscribed()) + n = i.read(buffer); } } catch (IOException e) { o.onError(e); diff --git a/src/test/java/rx/observables/StringObservableTest.java b/src/test/java/rx/observables/StringObservableTest.java index 3a3cb82..6aac7ad 100644 --- a/src/test/java/rx/observables/StringObservableTest.java +++ b/src/test/java/rx/observables/StringObservableTest.java @@ -38,6 +38,7 @@ import java.io.ByteArrayInputStream; import java.io.FilterReader; import java.io.IOException; +import java.io.InputStreamReader; import java.io.Reader; import java.io.StringReader; import java.nio.charset.Charset; @@ -303,6 +304,23 @@ public void testFromReader() { assertNotSame(inStr, outStr); assertEquals(inStr, outStr); } + + @Test + public void testFromReaderWillUnsubscribeBeforeCallingNextRead() { + final byte[] inBytes = "test".getBytes(); + final AtomicInteger numReads = new AtomicInteger(0); + ByteArrayInputStream is = new ByteArrayInputStream(inBytes) { + + @Override + public synchronized int read(byte[] b, int off, int len) { + numReads.incrementAndGet(); + return super.read(b, off, len); + } + }; + StringObservable.from(new InputStreamReader(is)).first().toBlocking() + .single(); + assertEquals(1, numReads.get()); + } @Test public void testByLine() {