Skip to content

Commit

Permalink
Added ability to do tap dump
Browse files Browse the repository at this point in the history
Change-Id: I8d761b5cd1e0d5a44c1db5cbd62c74167ff51a64
Reviewed-on: http://review.couchbase.org/7898
Reviewed-by: Matt Ingenthron <matt@couchbase.com>
Tested-by: Matt Ingenthron <matt@couchbase.com>
  • Loading branch information
Mike Wiederhold authored and ingenthr committed Jul 19, 2011
1 parent b26ab48 commit eae60d3
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 1 deletion.
13 changes: 13 additions & 0 deletions src/main/java/net/spy/memcached/OperationFactory.java
Expand Up @@ -284,4 +284,17 @@ SASLStepOperation saslStep(String[] mech, byte[] challenge,
* @return a tap ack operation.
*/
TapOperation tapAck(TapOpcode opcode, int opaque, OperationCallback cb);

/**
* Sends a tap dump message to the server.
*
* See <a href="http://www.couchbase.org/wiki/display/membase/TAP+Protocol">
* http://www.couchbase.org/wiki/display/membase/TAP+Protocol</a> for more
* details on the tap protocol.
*
* @param id the name for the TAP connection
* @param cb the callback for the tap stream.
* @return a tap dump operation.
*/
TapOperation tapDump(String id, OperationCallback cb);
}
30 changes: 30 additions & 0 deletions src/main/java/net/spy/memcached/TapClient.java
Expand Up @@ -288,6 +288,36 @@ public void run() {
return op;
}

public Operation tapDump(final String id) throws IOException, ConfigurationException {
final TapConnectionProvider conn;
if (vBucketAware) {
conn = new TapConnectionProvider(baseList, bucketName, usr, pwd);
} else {
conn = new TapConnectionProvider(addrs);
}

final CountDownLatch latch=new CountDownLatch(1);
final Operation op=conn.opFact.tapDump(id, new TapOperation.Callback() {

public void receivedStatus(OperationStatus status) {
}
public void gotData(ResponseMessage tapMessage) {
rqueue.add(tapMessage);
messagesRead++;
}
public void gotAck(TapOpcode opcode, int opaque) {
tapAck(conn, opcode, opaque, this);
}
public void complete() {
latch.countDown();
}});
synchronized(omap) {
omap.put(op, conn);
}
conn.addOp(op);
return op;
}

private void tapAck(TapConnectionProvider conn, TapOpcode opcode, int opaque,
OperationCallback cb) {
final Operation op=conn.opFact.tapAck(opcode, opaque, cb);
Expand Down
Expand Up @@ -155,4 +155,10 @@ public TapOperation tapAck(TapOpcode opcode, int opaque, OperationCallback cb) {
" protocol");
}

@Override
public TapOperation tapDump(String id, OperationCallback cb) {
throw new UnsupportedOperationException("Tap is not supported for ASCII" +
" protocol");
}

}
Expand Up @@ -155,4 +155,8 @@ public TapOperation tapCustom(String id, RequestMessage message,
public TapOperation tapAck(TapOpcode opcode, int opaque, OperationCallback cb) {
return new TapAckOperationImpl(opcode, opaque, cb);
}

public TapOperation tapDump(String id, OperationCallback cb) {
return new TapDumpOperationImpl(id, cb);
}
}
@@ -0,0 +1,41 @@
package net.spy.memcached.protocol.binary;

import java.util.UUID;

import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationState;
import net.spy.memcached.ops.TapOperation;
import net.spy.memcached.tapmessage.RequestMessage;
import net.spy.memcached.tapmessage.TapFlag;
import net.spy.memcached.tapmessage.TapMagic;
import net.spy.memcached.tapmessage.TapOpcode;

public class TapDumpOperationImpl extends TapOperationImpl implements TapOperation {
private final String id;

TapDumpOperationImpl(String id, OperationCallback cb) {
super(cb);
this.id = id;
}

@Override
public void initialize() {
RequestMessage message = new RequestMessage();
message.setMagic(TapMagic.PROTOCOL_BINARY_REQ);
message.setOpcode(TapOpcode.REQUEST);
message.setFlags(TapFlag.DUMP);
message.setFlags(TapFlag.SUPPORT_ACK);
if (id != null) {
message.setName(id);
} else {
message.setName(UUID.randomUUID().toString());
}

setBuffer(message.getBytes());
}

@Override
public void streamClosed(OperationState state) {
transitionState(state);
}
}
34 changes: 33 additions & 1 deletion src/test/java/net/spy/memcached/TapTest.java
Expand Up @@ -10,6 +10,8 @@

public class TapTest extends ClientBaseCase {

private static final long TAP_DUMP_TIMEOUT = 2000;

@Override
protected void initClient() throws Exception {
initClient(new BinaryConnectionFactory() {
Expand All @@ -28,7 +30,6 @@ public void testBackfill() throws Exception {
if (TestConfig.isMembase()) {
TapClient tc = new TapClient(AddrUtil.getAddresses(TestConfig.IPV4_ADDR + ":11210"));
tc.tapBackfill(null, 5, TimeUnit.SECONDS);

HashMap<String, Boolean> items = new HashMap<String, Boolean>();
for (int i = 0; i < 25; i++) {
client.set("key" + i, 0, "value" + i);
Expand All @@ -51,6 +52,37 @@ public void testBackfill() throws Exception {
}
}

public void testTapDump() throws Exception {
if (TestConfig.isMembase()) {
TapClient tc = new TapClient(AddrUtil.getAddresses(TestConfig.IPV4_ADDR + ":11210"));

HashMap<String, Boolean> items = new HashMap<String, Boolean>();
for (int i = 0; i < 25; i++) {
client.set("key" + i, 0, "value" + i).get();
items.put("key" + i + ",value" + i, new Boolean(false));
}
tc.tapDump(null);

long st = System.currentTimeMillis();
while(tc.hasMoreMessages()) {
if ((System.currentTimeMillis() - st) > TAP_DUMP_TIMEOUT) {
fail("Tap dump took too long");
}
ResponseMessage m;
if ((m = tc.getNextMessage()) != null) {
String key = m.getKey() + "," + new String(m.getValue());
if (items.containsKey(key)) {
items.put(key, new Boolean(true));
} else {
fail();
}
}
}
checkTapKeys(items);
assertTrue(client.flush().get().booleanValue());
}
}

public void testTapBucketDoesNotExist() throws Exception {
if (TestConfig.isMembase()) {
TapClient client = new TapClient(Arrays.asList(
Expand Down

0 comments on commit eae60d3

Please sign in to comment.