Permalink
Browse files

Changes for Observe.

Change-Id: I94a0709e0856b5dd72d108feec9574b0836b4fbf
Reviewed-on: http://review.couchbase.org/19537
Reviewed-by: Matt Ingenthron <matt@couchbase.com>
Tested-by: Matt Ingenthron <matt@couchbase.com>
  • Loading branch information...
1 parent 43f3ad8 commit 9455c1ae7935d62581ced73397ce9d89a40645d6 @ragsns ragsns committed with ingenthr Aug 13, 2012
@@ -260,11 +260,11 @@ public NodeLocator getNodeLocator() {
return transcoder;
}
- CountDownLatch broadcastOp(final BroadcastOpFactory of) {
+ public CountDownLatch broadcastOp(final BroadcastOpFactory of) {
return broadcastOp(of, mconn.getLocator().getAll(), true);
}
- CountDownLatch broadcastOp(final BroadcastOpFactory of,
+ public CountDownLatch broadcastOp(final BroadcastOpFactory of,
Collection<MemcachedNode> nodes) {
return broadcastOp(of, nodes, true);
}
@@ -729,6 +729,7 @@ public CountDownLatch broadcastOperation(final BroadcastOpFactory of,
Collection<MemcachedNode> nodes) {
final CountDownLatch latch = new CountDownLatch(locator.getAll().size());
for (MemcachedNode node : nodes) {
+ getLogger().debug("broadcast Operation: node = " + node);
Operation op = of.newOp(node, latch);
op.initialize();
node.addOp(op);
@@ -0,0 +1,64 @@
+/**
+ * Copyright (C) 2006-2009 Dustin Sallings
+ * Copyright (C) 2009-2012 Couchbase, Inc.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALING
+ * IN THE SOFTWARE.
+ */
+
+package net.spy.memcached;
+
+/**
+ * Response codes for a Observe operation.
+ */
+public enum ObserveResponse {
+ /**
+ * Response indicating the key was uninitialized.
+ */
+ UNINITIALIZED((byte) 0xff),
+ /**
+ * Response indicating the key was modified.
+ */
+ MODIFIED((byte) 0xfe),
+ /**
+ * Response indicating the key was persisted.
+ */
+ FOUND_PERSISTED((byte) 0x01),
+ /**
+ * Response indicating the key was found but not persisted.
+ */
+ FOUND_NOT_PERSISTED((byte) 0x00),
+ /**
+ * Response indicating the key was not found and persisted, as in
+ * the case of deletes - a real delete.
+ */
+ NOT_FOUND_PERSISTED((byte) 0x80),
+ /**
+ * Response indicating the key was not found and not
+ * persisted, as in the case of deletes - a logical delete.
+ */
+ NOT_FOUND_NOT_PERSISTED((byte) 0x11);
+
+ private final byte value;
+
+
+ ObserveResponse(byte b) {
+ value = b;
+ }
+
+}
@@ -41,6 +41,7 @@
import net.spy.memcached.ops.Mutator;
import net.spy.memcached.ops.MutatorOperation;
import net.spy.memcached.ops.NoopOperation;
+import net.spy.memcached.ops.ObserveOperation;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.SASLAuthOperation;
@@ -80,7 +81,7 @@
/**
* Create a Unlock operation.
*
- * @param key the key to delete
+ * @param key the key to unlock
* @param casId the value of CAS
* @param operationCallback the status callback
* @return the new UnlockOperation
@@ -89,6 +90,18 @@ UnlockOperation unlock(String key, long casId,
OperationCallback operationCallback);
/**
+ * Create an Observe operation.
+ *
+ * @param key the key to observe
+ * @param casId the value of CAS
+ * @param index the VBucket index of key
+ * @param operationCallback the status callback
+ * @return the new ObserveOperation
+ */
+ ObserveOperation observe(String key, long casId, int index,
+ ObserveOperation.Callback operationCallback);
+
+ /**
* Create a flush operation.
*
* @param delay delay until flush.
@@ -0,0 +1,49 @@
+/**
+ * Copyright (C) 2006-2009 Dustin Sallings
+ * Copyright (C) 2009-2012 Couchbase, Inc.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALING
+ * IN THE SOFTWARE.
+ */
+package net.spy.memcached;
+
+/**
+ * PersistTo codes for a Observe operation.
+ */
+public enum PersistTo {
+ /**
+ * Persist to the Master. ONE implies MASTER.
+ */
+ MASTER,
+ /**
+ * ONE implies MASTER.
+ */
+ ONE,
+ /**
+ * Persist to at least two nodes including Master.
+ */
+ TWO,
+ /**
+ * Persist to at least three nodes including Master.
+ */
+ THREE,
+ /**
+ * Persist to at least four nodes including Master.
+ */
+ FOUR
+}
@@ -0,0 +1,45 @@
+/**
+ * Copyright (C) 2006-2009 Dustin Sallings
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALING
+ * IN THE SOFTWARE.
+ */
+
+package net.spy.memcached;
+
+/**
+ * ReplicateTo codes for a Observe operation.
+ */
+public enum ReplicateTo {
+ /**
+ * Replicate to at least zero nodes.
+ */
+ ZERO,
+ /**
+ * Replicate to at least one node.
+ */
+ ONE,
+ /**
+ * Replicate to at least two nodes.
+ */
+ TWO,
+ /**
+ * Replicate to at least three nodes.
+ */
+ THREE
+}
@@ -0,0 +1,44 @@
+/**
+ * Copyright (C) 2009-2012 Couchbase, Inc.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALING
+ * IN THE SOFTWARE.
+ */
+
+package net.spy.memcached.ops;
+
+import net.spy.memcached.ObserveResponse;
+
+/**
+ * Observe operation.
+ */
+public interface ObserveOperation extends KeyedOperation {
+ /**
+ * Operation callback for the Observe request.
+ */
+ interface Callback extends OperationCallback {
+ /**
+ * Callback for each result from a observe.
+ *
+ * @param key the key that was retrieved
+ * @param cas the CAS value for this record
+ * @param or the ObserveResponse
+ */
+ void gotData(String key, long cas, ObserveResponse or);
+ }
+}
@@ -44,6 +44,7 @@
import net.spy.memcached.ops.Mutator;
import net.spy.memcached.ops.MutatorOperation;
import net.spy.memcached.ops.NoopOperation;
+import net.spy.memcached.ops.ObserveOperation;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationCallback;
@@ -90,6 +91,12 @@ public GetlOperation getl(String key, int exp, GetlOperation.Callback cb) {
return new GetlOperationImpl(key, exp, cb);
}
+ public ObserveOperation observe(String key, long casId, int index,
+ ObserveOperation.Callback cb) {
+ throw new UnsupportedOperationException("Observe is not supported "
+ + "for ASCII protocol");
+ }
+
public UnlockOperation unlock(String key, long casId,
OperationCallback cb) {
return new UnlockOperationImpl(key, casId, cb);
@@ -46,6 +46,7 @@
import net.spy.memcached.ops.Mutator;
import net.spy.memcached.ops.MutatorOperation;
import net.spy.memcached.ops.NoopOperation;
+import net.spy.memcached.ops.ObserveOperation;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.SASLAuthOperation;
@@ -74,6 +75,10 @@ public UnlockOperation unlock(String key, long casId,
OperationCallback cb) {
return new UnlockOperationImpl(key, casId, cb);
}
+ public ObserveOperation observe(String key, long casId, int index,
+ ObserveOperation.Callback cb) {
+ return new ObserveOperationImpl(key, casId, index, cb);
+ }
public FlushOperation flush(int delay, OperationCallback cb) {
return new FlushOperationImpl(cb);
}
@@ -0,0 +1,72 @@
+/**
+ * Copyright (C) 2006-2009 Dustin Sallings
+ * Copyright (C) 2009-2012 Couchbase, Inc.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALING
+ * IN THE SOFTWARE.
+ */
+
+
+package net.spy.memcached.protocol.binary;
+
+import net.spy.memcached.ObserveResponse;
+import net.spy.memcached.ops.ObserveOperation;
+import net.spy.memcached.ops.OperationCallback;
+
+class ObserveOperationImpl extends SingleKeyOperationImpl implements
+ ObserveOperation {
+
+ private static final byte CMD = (byte) 0x92;
+
+ private final long cas;
+ private final String key;
+ private final int index;
+ private byte keystate = (byte)0xff;
+ private long retCas = 0;
+
+ public ObserveOperationImpl(String k, long c, int i,
+ OperationCallback cb) {
+ super(CMD, generateOpaque(), k, cb);
+ cas = c;
+ key = k;
+ index = i;
+ }
+
+ @Override
+ public void initialize() {
+ prepareBuffer("", 0x0, EMPTY_BYTES, (short) index,
+ (short) key.length(), key.getBytes());
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + " Cas: " + cas;
+ }
+
+ @Override
+ protected void decodePayload(byte[] pl) {
+ final short keylen = (short) decodeShort(pl, 2);
+ keystate = (byte) decodeByte(pl, keylen+4);
+ retCas = (long) decodeLong(pl, keylen+5);
+
+ ObserveResponse r = ObserveResponse.values()[keystate];
+
+ ((ObserveOperation.Callback) getCallback()).gotData(key, retCas, r);
+ getCallback().receivedStatus(STATUS_OK);
+ }
+}
Oops, something went wrong.

0 comments on commit 9455c1a

Please sign in to comment.