Permalink
Browse files

changed primary connection between TDRouter and the outside world to …

…use streams

NOTE: right now in many cases they are still byte array streams
but in the future there is now a way to actually stream data out
  • Loading branch information...
mschoch committed Jul 7, 2012
1 parent 70d89ec commit 5c2dee506794c6865701a5b16b0b2d519a918245
@@ -1,12 +1,7 @@
package com.couchbase.touchdb.ektorp;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
import java.net.URISyntaxException;
import java.util.concurrent.CountDownLatch;
@@ -25,7 +20,6 @@
private TDURLConnection conn;
private TDRouter router;
private InputStream is;
- private OutputStream os;
final CountDownLatch doneSignal = new CountDownLatch(1);
public static TouchDBHttpResponse of(TDURLConnection conn, TDRouter router) throws IOException {
@@ -55,11 +49,7 @@ public InputStream getContent() {
} catch (InterruptedException e) {
Log.e(TDDatabase.TAG, "Interupted waiting for response", e);
}
- if(conn.isChunked()) {
- return is;
- } else {
- return new ByteArrayInputStream(((ByteArrayOutputStream)os).toByteArray());
- }
+ return is;
}
@Override
@@ -100,39 +90,10 @@ public void releaseConnection() {
/** TDRouterCallbackBlock **/
- @Override
- public synchronized void onDataAvailable(byte[] data) {
- try {
- os.write(data);
- os.flush();
- } catch (IOException e) {
- Log.e(TDDatabase.TAG, "Error wrtiting data", e);
- }
- }
-
- @Override
- public void onFinish() {
- try {
- os.close();
- } catch (IOException e) {
- Log.e(TDDatabase.TAG, "Error wrtiting data", e);
- }
-
- }
-
@Override
public void onResponseReady() {
doneSignal.countDown();
- if(conn.isChunked()) {
- is = new PipedInputStream();
- try {
- os = new PipedOutputStream((PipedInputStream)is);
- } catch (IOException e) {
- Log.e(TDDatabase.TAG, "Exception creating piped output stream", e);
- }
- } else {
- os = new ByteArrayOutputStream();
- }
+ is = conn.getResponseInputStream();
}
}
@@ -105,47 +105,29 @@ public void onResponseReady() {
}
}
}
- }
- @Override
- public void onFinish() {
- try {
- os.close();
- } catch (IOException e) {
- //ignore
- } finally {
- //signal the end no matter what happens at the end of this method
- doneSignal.countDown();
- }
+ doneSignal.countDown();
}
- @Override
- public synchronized void onDataAvailable(byte[] data) {
- if(data != null) {
- try {
- Log.v(TAG, String.format("Asked to write: %s", new String(data)));
- os.write(data);
- os.flush();
- response.flushBuffer();
- } catch (IOException e) {
- //dont bother logging this, it usually just means that a continuous changes listener hung up
- }
- }
- }
};
router.setCallbackBlock(callbackBlock);
- listener.onServerThread(new Runnable() {
-
- @Override
- public void run() {
- router.start();
- }
- });
+ synchronized (server) {
+ router.start();
+ }
try {
doneSignal.await();
+ InputStream responseInputStream = conn.getResponseInputStream();
+ final byte[] buffer = new byte[65536];
+ int r;
+ while ((r = responseInputStream.read(buffer)) > 0) {
+ os.write(buffer, 0, r);
+ os.flush();
+ response.flushBuffer();
+ }
+ os.close();
} catch (InterruptedException e) {
Log.e(TDDatabase.TAG, "Interrupted waiting for result", e);
} finally {
@@ -1,7 +1,9 @@
package com.couchbase.touchdb.router;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.lang.reflect.Method;
import java.net.MalformedURLException;
import java.net.URL;
@@ -401,13 +403,19 @@ public void start() {
// If response is ready (nonzero status), tell my client about it:
if(status.getCode() != 0) {
connection.setResponseCode(status.getCode());
- sendResponse();
- if(callbackBlock != null && connection.getResponseBody() != null) {
- callbackBlock.onDataAvailable(connection.getResponseBody().getJson());
- }
- if(callbackBlock != null && !waiting) {
- callbackBlock.onFinish();
+
+ if(connection.getResponseBody() != null) {
+ ByteArrayInputStream bais = new ByteArrayInputStream(connection.getResponseBody().getJson());
+ connection.setResponseInputStream(bais);
+ } else {
+
+ try {
+ connection.getResponseOutputStream().close();
+ } catch (IOException e) {
+ Log.e(TDDatabase.TAG, "Error closing empty output stream");
+ }
}
+ sendResponse();
}
}
@@ -880,7 +888,13 @@ public void sendContinuousChange(TDRevision rev) {
String jsonString = TDServer.getObjectMapper().writeValueAsString(changeDict);
if(callbackBlock != null) {
byte[] json = (jsonString + "\n").getBytes();
- callbackBlock.onDataAvailable(json);
+ OutputStream os = connection.getResponseOutputStream();
+ try {
+ os.write(json);
+ os.flush();
+ } catch (Exception e) {
+ Log.e(TDDatabase.TAG, "IOException writing to internal streams", e);
+ }
}
} catch (Exception e) {
Log.w("Unable to serialize change to JSON", e);
@@ -912,8 +926,13 @@ public void update(Observable observable, Object changeObject) {
} catch (Exception e) {
Log.w(TDDatabase.TAG, "Error serializing JSON", e);
}
- callbackBlock.onDataAvailable(data);
- callbackBlock.onFinish();
+ OutputStream os = connection.getResponseOutputStream();
+ try {
+ os.write(data);
+ os.close();
+ } catch (IOException e) {
+ Log.e(TDDatabase.TAG, "IOException writing to internal streams", e);
+ }
}
} else {
Log.w(TDDatabase.TAG, "TDRouter: Sending continous change chunk");
@@ -4,8 +4,4 @@
void onResponseReady();
- void onDataAvailable(byte[] data);
-
- void onFinish();
-
}
@@ -2,7 +2,10 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
import java.net.HttpURLConnection;
import java.net.ProtocolException;
import java.net.URL;
@@ -16,7 +19,10 @@
import java.util.SortedMap;
import java.util.TreeMap;
+import android.util.Log;
+
import com.couchbase.touchdb.TDBody;
+import com.couchbase.touchdb.TDDatabase;
public class TDURLConnection extends HttpURLConnection {
@@ -33,8 +39,17 @@
private static final String PUT = "PUT";
private static final String HEAD = "HEAD";
+ private OutputStream responseOutputStream;
+ private InputStream responseInputStream;
+
public TDURLConnection(URL url) {
super(url);
+ responseInputStream = new PipedInputStream();
+ try {
+ responseOutputStream = new PipedOutputStream((PipedInputStream)responseInputStream);
+ } catch (IOException e) {
+ Log.e(TDDatabase.TAG, "Exception creating piped output stream", e);
+ }
}
@Override
@@ -209,6 +224,27 @@ public boolean isChunked() {
return chunked;
}
+ public void setResponseInputStream(InputStream responseInputStream) {
+ this.responseInputStream = responseInputStream;
+ }
+
+ public InputStream getResponseInputStream() {
+ return responseInputStream;
+ }
+
+ public void setResponseOutputStream(OutputStream responseOutputStream) {
+ this.responseOutputStream = responseOutputStream;
+ }
+
+ public OutputStream getResponseOutputStream() {
+ return responseOutputStream;
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return responseInputStream;
+ }
+
}
/**

0 comments on commit 5c2dee5

Please sign in to comment.