Skip to content

Commit

Permalink
Terminate the runloop when the output stream is closed
Browse files Browse the repository at this point in the history
	Change on 2016/12/16 by lukhnos <lukhnos@google.com>

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=142267425
  • Loading branch information
lukhnos authored and tomball committed Dec 16, 2016
1 parent cb77832 commit d6c5080
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 23 deletions.
Expand Up @@ -14,6 +14,7 @@


package com.google.j2objc.io; package com.google.j2objc.io;


import com.google.j2objc.annotations.Weak;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.logging.Logger; import java.util.logging.Logger;
Expand All @@ -26,28 +27,28 @@
]-*/ ]-*/


/** /**
* An NSInputStream adapter piped to an NSOutputStream that in turn requests data via a * An NSInputStream adapter piped to an NSOutputStream that in turn requests data via a {@link
* {@link java.io.OutputStream} asynchronously. * java.io.OutputStream} asynchronously.
* *
* <p>The main use case is to enable J2ObjC apps to obtain an NSInputStream that they can offer data * <p>The main use case is to enable J2ObjC apps to obtain an NSInputStream that they can offer data
* to and then pass the stream to another object that takes one. NSMutableURLRequest's * to and then pass the stream to another object that takes one. NSMutableURLRequest's
* HTTPBodyStream is one such example. * HTTPBodyStream is one such example.
* *
* <p>The fundamental problem here is that streams in Java and streams in Objective-C (Foundation * <p>The fundamental problem here is that streams in Java and streams in Objective-C (Foundation to
* to be more precise) have different designs. If you pipe an NSOutputStream to an NSInputStream, * be more precise) have different designs. If you pipe an NSOutputStream to an NSInputStream, that
* that output stream requests data from you in an asynchronous manner using a callback, whereas * output stream requests data from you in an asynchronous manner using a callback, whereas Java's
* Java's OutputStream is synchronous. In addition, OutputStream.write(byte[], int, int) assumes * OutputStream is synchronous. In addition, OutputStream.write(byte[], int, int) assumes that all
* that all the bytes will be written to in one go, whereas -[NSOutputStream read:maxLength:] * the bytes will be written to in one go, whereas -[NSOutputStream read:maxLength:] returns the
* returns the actual bytes written. * actual bytes written.
* *
* <p>To use this adapter, call the {@link #open(Delegate, int)} method. It returns a native * <p>To use this adapter, call the {@link #create(Delegate, int)} method. It returns a native
* NSInputStream that you can pass to the target data consumer. To write data to this piped stream, * NSInputStream that you can pass to the target data consumer. To write data to this piped stream,
* implement the sole delegate method, and use the suppiled Java OutputStream to offer data. * implement the sole delegate method, and use the suppiled Java OutputStream to offer data.
* *
* <p>If you need to offer your data synchronously, you will need to consider using a pair of * <p>If you need to offer your data synchronously, you will need to consider using a pair of {@link
* {@link java.io.PipedInputStream} and {@link java.io.PipedOutputStream}, and offer the data * java.io.PipedInputStream} and {@link java.io.PipedOutputStream}, and offer the data using the
* using the PipedOutputStream, and in your {@link Delegate#offerData(OutputStream)}, read data * PipedOutputStream, and in your {@link Delegate#offerData(OutputStream)}, read data from the
* from the PipedInputStream. * PipedInputStream.
* *
* <p>It is safe to close the provided OutputStream multiple times. It is also safe to send -close * <p>It is safe to close the provided OutputStream multiple times. It is also safe to send -close
* to the underlying NSInputStream and NSOutputStream more than once. If the NSInputStream is closed * to the underlying NSInputStream and NSOutputStream more than once. If the NSInputStream is closed
Expand Down Expand Up @@ -78,7 +79,7 @@ static final class OutputStreamAdapter extends OutputStream {
private Delegate delegate; private Delegate delegate;
private Object nativeOutputStream; // NSOutputStream private Object nativeOutputStream; // NSOutputStream
private Object leftoverData; // NSData private Object leftoverData; // NSData
private Object threadForClosing; // NSThread @Weak private Object threadForClosing; // NSThread


/** If true, once the remaining leftover data is written, close() will be called. */ /** If true, once the remaining leftover data is written, close() will be called. */
private boolean closeAfterLeftoverCleared; private boolean closeAfterLeftoverCleared;
Expand Down Expand Up @@ -195,13 +196,18 @@ native void scheduleClose() /*-[
]-*/; ]-*/;


/*-[ /*-[
// Closes the stream *and* removes the stream from the run loop. // Closes the stream *and* removes the stream from the runloop.
- (void)doClose { - (void)doClose {
[(NSOutputStream *)nativeOutputStream_ close]; [(NSOutputStream *)nativeOutputStream_ close];
[(NSOutputStream *)nativeOutputStream_ removeFromRunLoop:[NSRunLoop currentRunLoop] [(NSOutputStream *)nativeOutputStream_ removeFromRunLoop:[NSRunLoop currentRunLoop]
forMode:NSRunLoopCommonModes]; forMode:NSRunLoopCommonModes];
[(NSOutputStream *)nativeOutputStream_ setDelegate:nil];
[delegate_ release]; [delegate_ release];
delegate_ = nil; delegate_ = nil;
threadForClosing_ = nil;
// Stop the runloop. After the runloop exits, -run will exit, and the thread will terminate.
CFRunLoopStop(CFRunLoopGetCurrent());
} }
// Schedules the output stream in the dedicated thread's runloop. // Schedules the output stream in the dedicated thread's runloop.
Expand Down Expand Up @@ -266,7 +272,20 @@ native void scheduleClose() /*-[


private AsyncPipedNSInputStreamAdapter() {} private AsyncPipedNSInputStreamAdapter() {}


public static Object open(Delegate delegate, int bufferSize) { /**
* Creates a native NSInputStream that is piped to a NSOutpuStream, which in turn requests data
* from the supplied delegate asynchronously.
*
* <p>Please note that the returned NSInputStream is not yet open. This is to allow the stream to
* be used by other Foundation API (such as NSMutableURLRequest) and is consistent with other
* NSInputStream initializers.
*
* @param delegate the delegate.
* @param bufferSize the size of the internal buffer used to pipe the NSOutputStream to the
* NSInputStream.
* @return a native NSInputStream.
*/
public static Object create(Delegate delegate, int bufferSize) {
if (bufferSize < 1) { if (bufferSize < 1) {
throw new IllegalArgumentException("Invalid buffer size: " + bufferSize); throw new IllegalArgumentException("Invalid buffer size: " + bufferSize);
} }
Expand All @@ -275,10 +294,10 @@ public static Object open(Delegate delegate, int bufferSize) {
throw new IllegalArgumentException("Delegate must not be null"); throw new IllegalArgumentException("Delegate must not be null");
} }


return nativeOpen(delegate, bufferSize); return nativeCreate(delegate, bufferSize);
} }


static native Object nativeOpen(Delegate delegate, int bufferSize) /*-[ static native Object nativeCreate(Delegate delegate, int bufferSize) /*-[
CFReadStreamRef readStreamRef; CFReadStreamRef readStreamRef;
CFWriteStreamRef writeStreamRef; CFWriteStreamRef writeStreamRef;
Expand Down
Expand Up @@ -184,7 +184,7 @@ protected void tearDown() throws Exception {
public void testFullWriteAndRead() { public void testFullWriteAndRead() {
DataProvider provider = new DataProvider(); DataProvider provider = new DataProvider();
NativeInputStreamConsumer consumer = new NativeInputStreamConsumer(); NativeInputStreamConsumer consumer = new NativeInputStreamConsumer();
Object stream = AsyncPipedNSInputStreamAdapter.open(provider, STREAM_BUFFER_SIZE); Object stream = AsyncPipedNSInputStreamAdapter.create(provider, STREAM_BUFFER_SIZE);
consumer.readUntilEnd(stream); consumer.readUntilEnd(stream);


assertTrue("The entire source is read", Arrays.equals(randomData, consumer.getBytes())); assertTrue("The entire source is read", Arrays.equals(randomData, consumer.getBytes()));
Expand All @@ -194,7 +194,7 @@ public void testFullWriteAndRead() {
public void testNothingWritten() { public void testNothingWritten() {
DataProvider provider = new DataProvider(0); DataProvider provider = new DataProvider(0);
NativeInputStreamConsumer consumer = new NativeInputStreamConsumer(); NativeInputStreamConsumer consumer = new NativeInputStreamConsumer();
Object stream = AsyncPipedNSInputStreamAdapter.open(provider, STREAM_BUFFER_SIZE); Object stream = AsyncPipedNSInputStreamAdapter.create(provider, STREAM_BUFFER_SIZE);
consumer.readUntilEnd(stream); consumer.readUntilEnd(stream);
assertEquals(0, provider.getTotalWritten()); assertEquals(0, provider.getTotalWritten());
assertEquals(0, consumer.getBytes().length); assertEquals(0, consumer.getBytes().length);
Expand All @@ -204,7 +204,7 @@ public void testNothingWritten() {
public void testNothingRead() { public void testNothingRead() {
DataProvider provider = new DataProvider(); DataProvider provider = new DataProvider();
NativeInputStreamConsumer consumer = new NativeInputStreamConsumer(0); NativeInputStreamConsumer consumer = new NativeInputStreamConsumer(0);
Object stream = AsyncPipedNSInputStreamAdapter.open(provider, STREAM_BUFFER_SIZE); Object stream = AsyncPipedNSInputStreamAdapter.create(provider, STREAM_BUFFER_SIZE);
consumer.readUntilEnd(stream); consumer.readUntilEnd(stream);
assertTrue("May provide more than actually read", provider.getTotalWritten() >= 0); assertTrue("May provide more than actually read", provider.getTotalWritten() >= 0);
assertEquals(0, consumer.getBytes().length); assertEquals(0, consumer.getBytes().length);
Expand All @@ -214,7 +214,7 @@ public void testNothingRead() {
public void testPartialRead() { public void testPartialRead() {
DataProvider provider = new DataProvider(); DataProvider provider = new DataProvider();
NativeInputStreamConsumer consumer = new NativeInputStreamConsumer(PARTIAL_SIZE); NativeInputStreamConsumer consumer = new NativeInputStreamConsumer(PARTIAL_SIZE);
Object stream = AsyncPipedNSInputStreamAdapter.open(provider, STREAM_BUFFER_SIZE); Object stream = AsyncPipedNSInputStreamAdapter.create(provider, STREAM_BUFFER_SIZE);
consumer.readUntilEnd(stream); consumer.readUntilEnd(stream);
assertTrue("May provide more than actually read", provider.getTotalWritten() >= PARTIAL_SIZE); assertTrue("May provide more than actually read", provider.getTotalWritten() >= PARTIAL_SIZE);
assertEquals(PARTIAL_SIZE, consumer.getBytes().length); assertEquals(PARTIAL_SIZE, consumer.getBytes().length);
Expand All @@ -225,7 +225,7 @@ public void testPartialRead() {
public void testPartialWrite() { public void testPartialWrite() {
DataProvider provider = new DataProvider(PARTIAL_SIZE); DataProvider provider = new DataProvider(PARTIAL_SIZE);
NativeInputStreamConsumer consumer = new NativeInputStreamConsumer(); NativeInputStreamConsumer consumer = new NativeInputStreamConsumer();
Object stream = AsyncPipedNSInputStreamAdapter.open(provider, STREAM_BUFFER_SIZE); Object stream = AsyncPipedNSInputStreamAdapter.create(provider, STREAM_BUFFER_SIZE);
consumer.readUntilEnd(stream); consumer.readUntilEnd(stream);
assertEquals(PARTIAL_SIZE, provider.getTotalWritten()); assertEquals(PARTIAL_SIZE, provider.getTotalWritten());
assertEquals(PARTIAL_SIZE, consumer.getBytes().length); assertEquals(PARTIAL_SIZE, consumer.getBytes().length);
Expand Down

0 comments on commit d6c5080

Please sign in to comment.