Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Rewrite bucketsStreaming handler using Observer

Change-Id: Ie1f0c25f6746dcfe550b461fc9ac9c9d9c439943
Reviewed-on: http://review.couchbase.org/11827
Tested-by: Sergey Avseyev <sergey.avseyev@gmail.com>
Reviewed-by: Matt Ingenthron <matt@couchbase.com>
Tested-by: Matt Ingenthron <matt@couchbase.com>
  • Loading branch information...
commit 71341874274b778ae74a685f9e16ff957751c063 1 parent 572ad8f
@avsej avsej authored ingenthr committed
View
70 src/main/java/org/couchbase/mock/CouchbaseMock.java
@@ -29,11 +29,9 @@
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
+import java.util.Observable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import org.couchbase.mock.Bucket.BucketType;
import org.couchbase.mock.http.Authenticator;
import org.couchbase.mock.http.PoolsHandler;
@@ -57,22 +55,15 @@
private HttpServer httpServer;
private Authenticator authenticator;
private ArrayList<Thread> nodeThreads;
- private final Lock configLock = new ReentrantLock();
- private Condition configInSync = configLock.newCondition();
private final CountDownLatch startupLatch;
+ private HarakiriMonitor monitor;
- private void setupHarakiriMonitor(String host) {
+ public void setupHarakiriMonitor(String host, boolean terminate) throws IOException {
int idx = host.indexOf(':');
String h = host.substring(0, idx);
int p = Integer.parseInt(host.substring(idx + 1));
- try {
- HarakiriMonitor m = new HarakiriMonitor(h, p, true, this);
- Thread t = new Thread(m, "HarakiriMonitor");
- t.start();
- } catch (Throwable t) {
- System.err.println("Failed to set up harakiri monitor: " + t.getMessage());
- System.exit(1);
- }
+ monitor = new HarakiriMonitor(h, p, terminate, this);
+ monitor.start();
}
/**
@@ -103,13 +94,18 @@ private void respawn(String bucketName, int idx) {
}
}
- public static class HarakiriMonitor implements Runnable {
+ public HarakiriMonitor getMonitor() {
+ return monitor;
+ }
+
+ public static class HarakiriMonitor extends Observable implements Runnable {
private final boolean terminate;
- private final Socket sock;
- private final BufferedReader input;
private final CouchbaseMock mock;
- private final OutputStream output;
+ private BufferedReader input;
+ private OutputStream output;
+ private Socket sock;
+ private Thread thread;
public HarakiriMonitor(String host, int port, boolean terminate, CouchbaseMock mock) throws IOException {
this.mock = mock;
@@ -119,6 +115,17 @@ public HarakiriMonitor(String host, int port, boolean terminate, CouchbaseMock m
output = sock.getOutputStream();
}
+ public void start()
+ {
+ thread = new Thread(this, "HarakiriMonitor");
+ thread.start();
+ }
+
+ public void stop()
+ {
+ thread.interrupt();
+ }
+
@Override
public void run() {
boolean closed = false;
@@ -158,8 +165,12 @@ public void run() {
}
if ("failover".equals(command)) {
mock.failover(bucket, idx);
+ setChanged();
+ notifyObservers();
} else if ("respawn".equals(command)) {
mock.respawn(bucket, idx);
+ setChanged();
+ notifyObservers();
}
} catch (NumberFormatException ex) {
}
@@ -306,7 +317,7 @@ public static void main(String[] args) {
}
CouchbaseMock mock = new CouchbaseMock(hostname, port, nodes, vbuckets, bucketsSpec);
if (harakirimonitor != null) {
- mock.setupHarakiriMonitor(harakirimonitor);
+ mock.setupHarakiriMonitor(harakirimonitor, true);
}
mock.start();
} catch (Exception e) {
@@ -314,27 +325,6 @@ public static void main(String[] args) {
}
}
- public void configUpdated() {
- configLock.lock();
- try {
- configInSync.signalAll();
- } finally {
- configLock.unlock();
- }
- }
-
- public boolean waitForUpdate() {
- configLock.lock();
- try {
- configInSync.await();
- } catch (InterruptedException ex) {
- return false;
- } finally {
- configLock.unlock();
- }
- return true;
- }
-
public void failSome(String name, float percentage) {
Bucket bucket = getBuckets().get(name);
if (bucket != null) {
View
57 src/main/java/org/couchbase/mock/http/PoolsHandler.java
@@ -15,6 +15,9 @@
*/
package org.couchbase.mock.http;
+import java.util.Observable;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.couchbase.mock.CouchbaseMock;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
@@ -24,6 +27,8 @@
import java.net.HttpURLConnection;
import java.util.HashMap;
import java.util.Map;
+import java.util.Observer;
+import java.util.concurrent.CountDownLatch;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
import org.couchbase.mock.Bucket;
@@ -34,6 +39,33 @@
*/
public class PoolsHandler implements HttpHandler {
+ private class ConfigObserver implements Observer {
+ private final OutputStream output;
+ private final Bucket bucket;
+ private final CountDownLatch complete;
+
+ public ConfigObserver(Bucket bucket, OutputStream output, CountDownLatch complete) {
+ this.bucket = bucket;
+ this.output = output;
+ this.complete = complete;
+ }
+
+ @Override
+ public void update(Observable o, Object arg) {
+ try {
+ byte[] data = bucket.getJSON().getBytes();
+ int n = data.length;
+ output.write(data);
+ output.write("\n\n\n\n".getBytes());
+ output.flush();
+ } catch (IOException ex) {
+ o.deleteObserver(this);
+ complete.countDown();
+ }
+ }
+
+ }
+
private final CouchbaseMock mock;
public PoolsHandler(CouchbaseMock mock) {
@@ -46,6 +78,7 @@ public void handle(HttpExchange exchange) throws IOException {
OutputStream body = exchange.getResponseBody();
String bucketName = exchange.getPrincipal().getName();
byte[] payload;
+ boolean chunked = false;
if (path.matches("^/pools/?$")) {
// GET /pools
@@ -83,16 +116,30 @@ public void handle(HttpExchange exchange) throws IOException {
// GET /pools/:poolName/bucketsStreaming/:bucketName
String[] tokens = path.split("/");
Bucket bucket = mock.getBuckets().get(tokens[tokens.length - 1]);
- exchange.sendResponseHeaders(HttpURLConnection.HTTP_OK, 0);
- do {
- body.write(bucket.getJSON().getBytes());
+ if (bucket != null) {
+ exchange.sendResponseHeaders(HttpURLConnection.HTTP_OK, 0);
+ chunked = true;
+ byte[] data = bucket.getJSON().getBytes();
+ body.write(data);
body.write("\n\n\n\n".getBytes());
body.flush();
- } while (mock.waitForUpdate());
+ CountDownLatch completed = new CountDownLatch(1);
+ ConfigObserver observer = new ConfigObserver(bucket, body, completed);
+ mock.getMonitor().addObserver(observer);
+ try {
+ completed.await();
+ } catch (InterruptedException ex) {
+ exchange.sendResponseHeaders(HttpURLConnection.HTTP_INTERNAL_ERROR, -1);
+ }
+ } else {
+ exchange.sendResponseHeaders(HttpURLConnection.HTTP_NOT_FOUND, -1);
+ }
} else {
exchange.sendResponseHeaders(HttpURLConnection.HTTP_NOT_FOUND, -1);
}
- body.close();
+ if (!chunked) {
+ body.close();
+ }
}
View
6 src/main/java/org/couchbase/mock/memcached/MemcachedServer.java
@@ -279,16 +279,10 @@ BinaryProtocolHandler getProtocolHandler() {
public void shutdown() {
active = false;
- if (cluster != null) {
- cluster.configUpdated();
- }
}
public void startup() {
active = true;
- if (cluster != null) {
- cluster.configUpdated();
- }
}
/**
View
25 src/test/java/org/couchbase/mock/JMembaseTest.java
@@ -264,22 +264,17 @@ public void testHarakiriMonitor() throws IOException {
}
}
- private String readConfig(InputStream stream) {
+ private String readConfig(InputStream stream) throws IOException {
int bb, lf = 0;
StringBuilder cfg = new StringBuilder();
do {
- try {
- bb = stream.read();
- if (bb == '\n') {
- lf++;
- } else {
- lf = 0;
- cfg.append(bb);
- }
- } catch (IOException ex) {
- Logger.getLogger(JMembaseTest.class.getName()).log(Level.SEVERE, null, ex);
- return null;
+ bb = stream.read();
+ if (bb == '\n') {
+ lf++;
+ } else {
+ lf = 0;
+ cfg.append(bb);
}
} while (lf < 4);
return cfg.toString();
@@ -288,9 +283,7 @@ private String readConfig(InputStream stream) {
public void testConfigStreaming() throws IOException {
System.out.println("testConfigStreaming");
ServerSocket server = new ServerSocket(0);
- CouchbaseMock.HarakiriMonitor m = new CouchbaseMock.HarakiriMonitor(null, server.getLocalPort(), true, instance);
- Thread t = new Thread(m, "HarakiriMonitor");
- t.start();
+ instance.setupHarakiriMonitor("localhost:" + server.getLocalPort(), false);
Socket client = server.accept();
InputStream cin = client.getInputStream();
OutputStream cout = client.getOutputStream();
@@ -323,6 +316,6 @@ public void testConfigStreaming() throws IOException {
assertEquals(100, defaultBucket.activeServers().size());
server.close();
- t.interrupt();
+ instance.getMonitor().stop();
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.