Skip to content

Commit

Permalink
Merge pull request #1288 from davidmoten/string-from-bytes
Browse files Browse the repository at this point in the history
Ensure StringObservable.from() does not perform unnecessary read
  • Loading branch information
benjchristensen committed May 30, 2014
2 parents 23da27a + 46b51cc commit 73ed329
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ public void call(Subscriber<? super byte[]> o) {
try {
if (o.isUnsubscribed())
return;
int n = 0;
n = i.read(buffer);
int n = i.read(buffer);
while (n != -1 && !o.isUnsubscribed()) {
o.onNext(Arrays.copyOf(buffer, n));
n = i.read(buffer);
if (!o.isUnsubscribed())
n = i.read(buffer);
}
} catch (IOException e) {
o.onError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.nio.charset.MalformedInputException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

Expand Down Expand Up @@ -246,6 +247,22 @@ public void testFromInputStream() {
assertArrayEquals(inBytes, outBytes);
}

@Test
public void testFromInputStreamWillUnsubscribeBeforeCallingNextRead() {
final byte[] inBytes = "test".getBytes();
final AtomicInteger numReads = new AtomicInteger(0);
ByteArrayInputStream is = new ByteArrayInputStream(inBytes) {

@Override
public synchronized int read(byte[] b, int off, int len) {
numReads.incrementAndGet();
return super.read(b, off, len);
}
};
StringObservable.from(is).first().toBlockingObservable().single();
assertEquals(1, numReads.get());
}

@Test
public void testFromReader() {
final String inStr = "test";
Expand Down

0 comments on commit 73ed329

Please sign in to comment.