Permalink
Browse files

Created TapStream abstraction

Previously when we created a tap stream we would only send it
to a single node in the memcached cluster. This fix creates
a tap stream abstraction that aggregates the tap streams
sent to each server and treats them as a single tap stream.

Change-Id: I68650e187eec5f79952dab1374bad0e07d5a6c30
Reviewed-on: http://review.couchbase.org/13081
Reviewed-by: Matt Ingenthron <matt@couchbase.com>
Tested-by: Michael Wiederhold <mike@couchbase.com>
  • Loading branch information...
1 parent c3f32fb commit 680f227582aab17b75a6ef6eaf9e887c65d8a8ea Mike Wiederhold committed with mikewied Feb 8, 2012
@@ -38,19 +38,19 @@
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationCallback;
-import net.spy.memcached.ops.OperationState;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.TapOperation;
import net.spy.memcached.tapmessage.RequestMessage;
import net.spy.memcached.tapmessage.ResponseMessage;
import net.spy.memcached.tapmessage.TapOpcode;
+import net.spy.memcached.tapmessage.TapStream;
/**
* A tap client for memcached.
*/
public class TapClient {
protected BlockingQueue<Object> rqueue;
- protected HashMap<Operation, TapConnectionProvider> omap;
+ protected final HashMap<TapStream, TapConnectionProvider> omap;
protected long messagesRead;
private List<InetSocketAddress> addrs;
@@ -78,7 +78,7 @@ public TapClient(InetSocketAddress... ia) {
*/
public TapClient(List<InetSocketAddress> addrs) {
this.rqueue = new LinkedBlockingQueue<Object>();
- this.omap = new HashMap<Operation, TapConnectionProvider>();
+ this.omap = new HashMap<TapStream, TapConnectionProvider>();
this.addrs = addrs;
this.messagesRead = 0;
}
@@ -110,7 +110,7 @@ public ResponseMessage getNextMessage(long time, TimeUnit timeunit) {
return (ResponseMessage) m;
} else if (m instanceof TapAck) {
TapAck ack = (TapAck) m;
- tapAck(ack.getConn(), ack.getOpcode(), ack.getOpaque(),
+ tapAck(ack.getConn(), ack.getNode(), ack.getOpcode(), ack.getOpaque(),
ack.getCallback());
return null;
} else {
@@ -134,13 +134,12 @@ public boolean hasMoreMessages() {
return true;
} else {
synchronized (omap) {
- Iterator<Operation> itr = omap.keySet().iterator();
+ Iterator<TapStream> itr = omap.keySet().iterator();
while (itr.hasNext()) {
- Operation op = itr.next();
- if (op.getState().equals(OperationState.COMPLETE) || op.isCancelled()
- || op.hasErrored()) {
- omap.get(op).shutdown();
- omap.remove(op);
+ TapStream ts = itr.next();
+ if (ts.isCompleted() || ts.isCancelled() || ts.hasErrored()) {
+ omap.get(ts).shutdown();
+ omap.remove(ts);
}
}
if (omap.size() > 0) {
@@ -163,33 +162,37 @@ public boolean hasMoreMessages() {
* memcached cluster.
* @throws IOException if there are errors connecting to the cluster.
*/
- public Operation tapCustom(String id, RequestMessage message)
+ public TapStream tapCustom(final String id, final RequestMessage message)
throws ConfigurationException, IOException {
final TapConnectionProvider conn = new TapConnectionProvider(addrs);
- final CountDownLatch latch = new CountDownLatch(1);
- final Operation op = conn.getOpFactory().tapCustom(id, message,
- new TapOperation.Callback() {
- public void receivedStatus(OperationStatus status) {
- }
-
- public void gotData(ResponseMessage tapMessage) {
- rqueue.add(tapMessage);
- messagesRead++;
- }
-
- public void gotAck(TapOpcode opcode, int opaque) {
- rqueue.add(new TapAck(conn, opcode, opaque, this));
- }
-
- public void complete() {
- latch.countDown();
- }
- });
+ final TapStream ts = new TapStream();
+ conn.broadcastOp(new BroadcastOpFactory() {
+ public Operation newOp(final MemcachedNode n,
+ final CountDownLatch latch) {
+ Operation op = conn.getOpFactory().tapCustom(id, message,
+ new TapOperation.Callback() {
+ public void receivedStatus(OperationStatus status) {
+ }
+ public void gotData(ResponseMessage tapMessage) {
+ rqueue.add(tapMessage);
+ messagesRead++;
+ }
+ public void gotAck(MemcachedNode node, TapOpcode opcode,
+ int opaque) {
+ rqueue.add(new TapAck(conn, node, opcode, opaque, this));
+ }
+ public void complete() {
+ latch.countDown();
+ }
+ });
+ ts.addOp((TapOperation)op);
+ return op;
+ }
+ });
synchronized (omap) {
- omap.put(op, conn);
+ omap.put(ts, conn);
}
- conn.addOp(op);
- return op;
+ return ts;
}
/**
@@ -203,44 +206,51 @@ public void complete() {
* memcached cluster.
* @throws IOException If there are errors connecting to the cluster.
*/
- public Operation tapDump(final String id) throws IOException,
+ public TapStream tapDump(final String id) throws IOException,
ConfigurationException {
final TapConnectionProvider conn = new TapConnectionProvider(addrs);
- final CountDownLatch latch = new CountDownLatch(1);
- final Operation op = conn.getOpFactory().tapDump(id,
- new TapOperation.Callback() {
- public void receivedStatus(OperationStatus status) {
- }
- public void gotData(ResponseMessage tapMessage) {
- rqueue.add(tapMessage);
- messagesRead++;
- }
- public void gotAck(TapOpcode opcode, int opaque) {
- rqueue.add(new TapAck(conn, opcode, opaque, this));
- }
- public void complete() {
- latch.countDown();
- }
- });
+ final TapStream ts = new TapStream();
+ conn.broadcastOp(new BroadcastOpFactory() {
+ public Operation newOp(final MemcachedNode n,
+ final CountDownLatch latch) {
+ Operation op = conn.getOpFactory().tapDump(id,
+ new TapOperation.Callback() {
+ public void receivedStatus(OperationStatus status) {
+ }
+ public void gotData(ResponseMessage tapMessage) {
+ rqueue.add(tapMessage);
+ messagesRead++;
+ }
+ public void gotAck(MemcachedNode node, TapOpcode opcode,
+ int opaque) {
+ rqueue.add(new TapAck(conn, node, opcode, opaque, this));
+ }
+ public void complete() {
+ latch.countDown();
+ }
+ });
+ ts.addOp((TapOperation)op);
+ return op;
+ }
+ });
synchronized (omap) {
- omap.put(op, conn);
+ omap.put(ts, conn);
}
- conn.addOp(op);
- return op;
+ return ts;
}
- private void tapAck(TapConnectionProvider conn, TapOpcode opcode, int opaque,
- OperationCallback cb) {
+ private void tapAck(TapConnectionProvider conn, MemcachedNode node,
+ TapOpcode opcode, int opaque, OperationCallback cb) {
final Operation op = conn.getOpFactory().tapAck(opcode, opaque, cb);
- conn.addOp(op);
+ conn.addTapAckOp(node, op);
}
/**
* Shuts down all tap streams that are currently running.
*/
public void shutdown() {
synchronized (omap) {
- for (Map.Entry<Operation, TapConnectionProvider> me : omap.entrySet()) {
+ for (Map.Entry<TapStream, TapConnectionProvider> me : omap.entrySet()) {
me.getValue().shutdown();
}
}
@@ -257,14 +267,16 @@ public long getMessagesRead() {
}
class TapAck {
- private TapConnectionProvider conn;
- private TapOpcode opcode;
- private int opaque;
- private OperationCallback cb;
+ private final TapConnectionProvider conn;
+ private final TapOpcode opcode;
+ private final int opaque;
+ private final MemcachedNode node;
+ private final OperationCallback cb;
- public TapAck(TapConnectionProvider conn, TapOpcode opcode, int opaque,
- OperationCallback cb) {
+ public TapAck(TapConnectionProvider conn, MemcachedNode node,
+ TapOpcode opcode, int opaque, OperationCallback cb) {
this.conn = conn;
+ this.node = node;
this.opcode = opcode;
this.opaque = opaque;
this.cb = cb;
@@ -274,6 +286,10 @@ public TapConnectionProvider getConn() {
return conn;
}
+ public MemcachedNode getNode() {
+ return node;
+ }
+
public TapOpcode getOpcode() {
return opcode;
}
@@ -112,8 +112,15 @@ public TapConnectionProvider(ConnectionFactory cf,
}
}
- protected void addOp(final Operation op) {
- conn.enqueueOperation("TStream", op);
+ public void addTapAckOp(MemcachedNode node, final Operation op) {
+ conn.addOperation(node, op);
+ }
+
+ public CountDownLatch broadcastOp(final BroadcastOpFactory of) {
+ if (shuttingDown) {
+ throw new IllegalStateException("Shutting down");
+ }
+ return conn.broadcastOperation(of, conn.getLocator().getAll());
}
/**
@@ -250,15 +257,6 @@ public void receivedStatus(OperationStatus s) {
}
}
- CountDownLatch broadcastOp(final BroadcastOpFactory of) {
- return broadcastOp(of, conn.getLocator().getAll(), true);
- }
-
- CountDownLatch broadcastOp(final BroadcastOpFactory of,
- Collection<MemcachedNode> nodes) {
- return broadcastOp(of, nodes, true);
- }
-
private CountDownLatch broadcastOp(BroadcastOpFactory of,
Collection<MemcachedNode> nodes, boolean checkShuttingDown) {
if (checkShuttingDown && shuttingDown) {
@@ -22,6 +22,7 @@
package net.spy.memcached.ops;
+import net.spy.memcached.MemcachedNode;
import net.spy.memcached.tapmessage.ResponseMessage;
import net.spy.memcached.tapmessage.TapOpcode;
@@ -41,7 +42,7 @@
*/
void gotData(ResponseMessage message);
- void gotAck(TapOpcode opcode, int opaque);
+ void gotAck(MemcachedNode node, TapOpcode opcode, int opaque);
}
void streamClosed(OperationState state);
@@ -75,8 +75,8 @@ public void readFromBuffer(ByteBuffer data) throws IOException {
ResponseMessage response = new ResponseMessage(message);
for (TapResponseFlag flag : response.getFlags()) {
if (flag == TapResponseFlag.TAP_ACK) {
- ((Callback) getCallback()).gotAck(response.getOpcode(),
- response.getOpaque());
+ ((Callback) getCallback()).gotAck(getHandlingNode(),
+ response.getOpcode(), response.getOpaque());
}
}
if (response.getOpcode() != TapOpcode.OPAQUE && response.getOpcode()
Oops, something went wrong.

0 comments on commit 680f227

Please sign in to comment.