Permalink
Browse files

Use TapStream abstraction in spymemcached

This will allow our tap streams to be able to connect to
all node in the cluster.

Change-Id: I233a8fc96bb55cf12e73e40f46281dd3870004b5
Reviewed-on: http://review.couchbase.org/13083
Reviewed-by: Matt Ingenthron <matt@couchbase.com>
Reviewed-by: Raghavan N. Srinivas <raghavan.srinivas@gmail.com>
Tested-by: Matt Ingenthron <matt@couchbase.com>
  • Loading branch information...
Mike Wiederhold authored and ingenthr committed Feb 8, 2012
1 parent 4a38b26 commit 7d9b64af066781a83ae3e02a02427a1ea61e37ae
@@ -32,14 +32,16 @@
import javax.naming.ConfigurationException;
+import net.spy.memcached.BroadcastOpFactory;
+import net.spy.memcached.MemcachedNode;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationCallback;
-import net.spy.memcached.ops.OperationState;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.TapOperation;
import net.spy.memcached.tapmessage.RequestMessage;
import net.spy.memcached.tapmessage.ResponseMessage;
import net.spy.memcached.tapmessage.TapOpcode;
+import net.spy.memcached.tapmessage.TapStream;
/**
* A tap client for Couchbase server.
@@ -99,7 +101,7 @@ public ResponseMessage getNextMessage(long time, TimeUnit timeunit) {
return (ResponseMessage) m;
} else if (m instanceof TapAck) {
TapAck ack = (TapAck) m;
- tapAck(ack.getConn(), ack.getOpcode(), ack.getOpaque(),
+ tapAck(ack.getConn(), ack.getNode(), ack.getOpcode(), ack.getOpaque(),
ack.getCallback());
return null;
} else {
@@ -123,10 +125,10 @@ public boolean hasMoreMessages() {
return true;
} else {
synchronized (omap) {
- Iterator<Operation> itr = omap.keySet().iterator();
+ Iterator<TapStream> itr = omap.keySet().iterator();
while (itr.hasNext()) {
- Operation op = itr.next();
- if (op.getState().equals(OperationState.COMPLETE) || op.isCancelled()
+ TapStream op = itr.next();
+ if (op.isCompleted() || op.isCancelled()
|| op.hasErrored()) {
omap.get(op).shutdown();
omap.remove(op);
@@ -152,35 +154,38 @@ public boolean hasMoreMessages() {
* Couchbase cluster.
* @throws IOException if there are errors connecting to the cluster.
*/
- public Operation tapCustom(String id, RequestMessage message)
+ public TapStream tapCustom(final String id, final RequestMessage message)
throws ConfigurationException, IOException {
final TapConnectionProvider conn = new TapConnectionProvider(baseList,
bucketName, pwd);
-
- final CountDownLatch latch = new CountDownLatch(1);
- final Operation op = conn.getOpFactory().tapCustom(id, message,
- new TapOperation.Callback() {
- public void receivedStatus(OperationStatus status) {
- }
-
- public void gotData(ResponseMessage tapMessage) {
- rqueue.add(tapMessage);
- messagesRead++;
- }
-
- public void gotAck(TapOpcode opcode, int opaque) {
- rqueue.add(new TapAck(conn, opcode, opaque, this));
- }
-
- public void complete() {
- latch.countDown();
- }
- });
+ final TapStream ts = new TapStream();
+ conn.broadcastOp(new BroadcastOpFactory() {
+ public Operation newOp(final MemcachedNode n,
+ final CountDownLatch latch) {
+ Operation op = conn.getOpFactory().tapCustom(id, message,
+ new TapOperation.Callback() {
+ public void receivedStatus(OperationStatus status) {
+ }
+ public void gotData(ResponseMessage tapMessage) {
+ rqueue.add(tapMessage);
+ messagesRead++;
+ }
+ public void gotAck(MemcachedNode node, TapOpcode opcode,
+ int opaque) {
+ rqueue.add(new TapAck(conn, node, opcode, opaque, this));
+ }
+ public void complete() {
+ latch.countDown();
+ }
+ });
+ ts.addOp((TapOperation)op);
+ return op;
+ }
+ });
synchronized (omap) {
- omap.put(op, conn);
+ omap.put(ts, conn);
}
- conn.addOp(op);
- return op;
+ return ts;
}
/**
@@ -197,7 +202,7 @@ public void complete() {
* Couchbase cluster.
* @throws IOException If there are errors connecting to the cluster.
*/
- public Operation tapBackfill(String id, final int runTime,
+ public TapStream tapBackfill(String id, final int runTime,
final TimeUnit timeunit) throws IOException, ConfigurationException {
return tapBackfill(id, -1, runTime, timeunit);
}
@@ -218,35 +223,38 @@ public Operation tapBackfill(String id, final int runTime,
* Couchbase cluster.
* @throws IOException If there are errors connecting to the cluster.
*/
- public Operation tapBackfill(final String id, final long date,
+ public TapStream tapBackfill(final String id, final long date,
final int runTime, final TimeUnit timeunit) throws IOException,
ConfigurationException {
final TapConnectionProvider conn = new TapConnectionProvider(baseList,
bucketName, pwd);
-
- final CountDownLatch latch = new CountDownLatch(1);
- final Operation op = conn.getOpFactory().tapBackfill(id, date,
- new TapOperation.Callback() {
- public void receivedStatus(OperationStatus status) {
- }
-
- public void gotData(ResponseMessage tapMessage) {
- rqueue.add(tapMessage);
- messagesRead++;
- }
-
- public void gotAck(TapOpcode opcode, int opaque) {
- rqueue.add(new TapAck(conn, opcode, opaque, this));
- }
-
- public void complete() {
- latch.countDown();
- }
- });
+ final TapStream ts = new TapStream();
+ conn.broadcastOp(new BroadcastOpFactory() {
+ public Operation newOp(final MemcachedNode n,
+ final CountDownLatch latch) {
+ Operation op = conn.getOpFactory().tapBackfill(id, date,
+ new TapOperation.Callback() {
+ public void receivedStatus(OperationStatus status) {
+ }
+ public void gotData(ResponseMessage tapMessage) {
+ rqueue.add(tapMessage);
+ messagesRead++;
+ }
+ public void gotAck(MemcachedNode node, TapOpcode opcode,
+ int opaque) {
+ rqueue.add(new TapAck(conn, node, opcode, opaque, this));
+ }
+ public void complete() {
+ latch.countDown();
+ }
+ });
+ ts.addOp((TapOperation)op);
+ return op;
+ }
+ });
synchronized (omap) {
- omap.put(op, conn);
+ omap.put(ts, conn);
}
- conn.addOp(op);
if (runTime > 0) {
Runnable r = new Runnable() {
@@ -259,13 +267,13 @@ public void run() {
}
conn.shutdown();
synchronized (omap) {
- omap.remove(op);
+ omap.remove(ts);
}
}
};
new Thread(r).start();
}
- return op;
+ return ts;
}
/**
@@ -279,46 +287,52 @@ public void run() {
* Couchbase cluster.
* @throws IOException If there are errors connecting to the cluster.
*/
- public Operation tapDump(final String id) throws IOException,
+ public TapStream tapDump(final String id) throws IOException,
ConfigurationException {
final TapConnectionProvider conn = new TapConnectionProvider(baseList,
bucketName, pwd);
-
- final CountDownLatch latch = new CountDownLatch(1);
- final Operation op = conn.getOpFactory().tapDump(id,
- new TapOperation.Callback() {
- public void receivedStatus(OperationStatus status) {
- }
- public void gotData(ResponseMessage tapMessage) {
- rqueue.add(tapMessage);
- messagesRead++;
- }
- public void gotAck(TapOpcode opcode, int opaque) {
- rqueue.add(new TapAck(conn, opcode, opaque, this));
- }
- public void complete() {
- latch.countDown();
- }
- });
+ final TapStream ts = new TapStream();
+ conn.broadcastOp(new BroadcastOpFactory() {
+ public Operation newOp(final MemcachedNode n,
+ final CountDownLatch latch) {
+ Operation op = conn.getOpFactory().tapDump(id,
+ new TapOperation.Callback() {
+ public void receivedStatus(OperationStatus status) {
+ }
+ public void gotData(ResponseMessage tapMessage) {
+ rqueue.add(tapMessage);
+ messagesRead++;
+ }
+ public void gotAck(MemcachedNode node, TapOpcode opcode,
+ int opaque) {
+ rqueue.add(new TapAck(conn, node, opcode, opaque, this));
+ }
+ public void complete() {
+ latch.countDown();
+ }
+ });
+ ts.addOp((TapOperation)op);
+ return op;
+ }
+ });
synchronized (omap) {
- omap.put(op, conn);
+ omap.put(ts, conn);
}
- conn.addOp(op);
- return op;
+ return ts;
}
- private void tapAck(TapConnectionProvider conn, TapOpcode opcode, int opaque,
- OperationCallback cb) {
+ private void tapAck(TapConnectionProvider conn, MemcachedNode node,
+ TapOpcode opcode, int opaque, OperationCallback cb) {
final Operation op = conn.getOpFactory().tapAck(opcode, opaque, cb);
- conn.addOp(op);
+ conn.addTapAckOp(node, op);
}
/**
* Shuts down all tap streams that are currently running.
*/
public void shutdown() {
synchronized (omap) {
- for (Map.Entry<Operation, net.spy.memcached.TapConnectionProvider> me
+ for (Map.Entry<TapStream, net.spy.memcached.TapConnectionProvider> me
: omap.entrySet()) {
me.getValue().shutdown();
}
@@ -336,14 +350,16 @@ public long getMessagesRead() {
}
class TapAck {
- private TapConnectionProvider conn;
- private TapOpcode opcode;
- private int opaque;
- private OperationCallback cb;
-
- public TapAck(TapConnectionProvider conn, TapOpcode opcode, int opaque,
- OperationCallback cb) {
+ private final TapConnectionProvider conn;
+ private final MemcachedNode node;
+ private final TapOpcode opcode;
+ private final int opaque;
+ private final OperationCallback cb;
+
+ public TapAck(TapConnectionProvider conn, MemcachedNode node,
+ TapOpcode opcode, int opaque, OperationCallback cb) {
this.conn = conn;
+ this.node = node;
this.opcode = opcode;
this.opaque = opaque;
this.cb = cb;
@@ -353,6 +369,10 @@ public TapConnectionProvider getConn() {
return conn;
}
+ public MemcachedNode getNode() {
+ return node;
+ }
+
public TapOpcode getOpcode() {
return opcode;
}
@@ -22,6 +22,7 @@
package com.couchbase.client;
+import com.couchbase.client.vbucket.ConfigurationProvider;
import com.couchbase.client.vbucket.Reconfigurable;
import com.couchbase.client.vbucket.config.Bucket;
@@ -33,7 +34,6 @@
import net.spy.memcached.AddrUtil;
import net.spy.memcached.ConnectionObserver;
-import net.spy.memcached.ops.Operation;
/**
* A TapConnectionProvider for Couchbase Server.
@@ -42,6 +42,8 @@
extends net.spy.memcached.TapConnectionProvider
implements Reconfigurable {
+ private final ConfigurationProvider cp;
+
/**
* Get a tap connection based on the REST response from a Couchbase server.
*
@@ -67,11 +69,8 @@ public TapConnectionProvider(final List<URI> baseList,
public TapConnectionProvider(CouchbaseConnectionFactory cf)
throws IOException, ConfigurationException{
super(cf, AddrUtil.getAddresses(cf.getVBucketConfig().getServers()));
- cf.getConfigurationProvider().subscribe(cf.getBucketName(), this);
- }
-
- protected void addOp(final Operation op) {
- conn.enqueueOperation("TStream", op);
+ cp = cf.getConfigurationProvider();
+ cp.subscribe(cf.getBucketName(), this);
}
/**

0 comments on commit 7d9b64a

Please sign in to comment.