Skip to content

Commit

Permalink
Enhanced observe and added observe support for other mutation ops
Browse files Browse the repository at this point in the history
Throw ObservedException, ObservedTimeoutException
and ObservedModifiedException
Added support for add, replace, cas
Updated Tests for observing the mutation operations

Change-Id: Ifa9accbe8056f28ecfefaac19a9d5bdf34fb9951
Reviewed-on: http://review.couchbase.org/20329
Reviewed-by: Matt Ingenthron <matt@couchbase.com>
Tested-by: Matt Ingenthron <matt@couchbase.com>
  • Loading branch information
ragsns authored and ingenthr committed Aug 31, 2012
1 parent 710f398 commit 4b300b2
Show file tree
Hide file tree
Showing 6 changed files with 392 additions and 37 deletions.
247 changes: 217 additions & 30 deletions src/main/java/com/couchbase/client/CouchbaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@

import net.spy.memcached.AddrUtil;
import net.spy.memcached.BroadcastOpFactory;
import net.spy.memcached.CASResponse;
import net.spy.memcached.CASValue;
import net.spy.memcached.CachedData;
import net.spy.memcached.MemcachedClient;
Expand Down Expand Up @@ -817,23 +818,32 @@ public Boolean unlock(final String key,
*/
public OperationFuture<Boolean> delete(String key,
PersistTo req, ReplicateTo rep) {

OperationFuture<Boolean> deleteOp = delete(key);
boolean deleteStatus = false;

try {
deleteOp.get();
deleteOp.set(true, deleteOp.getStatus());
observePoll(key, 0L, req, rep);
deleteStatus = deleteOp.get();
} catch (InterruptedException e) {
deleteOp.set(false, deleteOp.getStatus());
deleteOp.set(false, new OperationStatus(false, "Delete get timed out"));
} catch (ExecutionException e) {
deleteOp.set(false, deleteOp.getStatus());
} catch (TimeoutException e) {
deleteOp.set(false, deleteOp.getStatus());
} catch (IllegalArgumentException e) {
deleteOp.set(false, deleteOp.getStatus());
} catch (RuntimeException e) {
deleteOp.set(false, deleteOp.getStatus());
deleteOp.set(false, new OperationStatus(false, "Delete get "
+ "execution exception "));
}
if (!deleteStatus) {
return deleteOp;
}
try {
observePoll(key, 0x0L, req, rep);
deleteOp.set(true, deleteOp.getStatus());
} catch (ObservedException e) {
deleteOp.set(false, new OperationStatus(false, e.getMessage()));
} catch (ObservedTimeoutException e) {
deleteOp.set(false, new OperationStatus(false, e.getMessage()));
} catch (ObservedModifiedException e) {
deleteOp.set(false, new OperationStatus(false, e.getMessage()));
}
return (deleteOp);
return deleteOp;
}
/**
* Delete a value with Observe.
Expand All @@ -860,23 +870,33 @@ public OperationFuture<Boolean> delete(String key, PersistTo req) {
*/
public OperationFuture<Boolean> set(String key, int exp,
String value, PersistTo req, ReplicateTo rep) {

OperationFuture<Boolean> setOp = set(key, exp, value);

boolean setStatus = false;

try {
if (setOp.get()) {
observePoll(key, setOp.getCas(), req, rep);
}
setStatus = setOp.get();
} catch (InterruptedException e) {
setOp.set(false, setOp.getStatus());
setOp.set(false, new OperationStatus(false, "Set get timed out"));
} catch (ExecutionException e) {
setOp.set(false, setOp.getStatus());
} catch (TimeoutException e) {
setOp.set(false, setOp.getStatus());
} catch (IllegalArgumentException e) {
setOp.set(false, setOp.getStatus());
} catch (RuntimeException e) {
setOp.set(false, setOp.getStatus());
setOp.set(false, new OperationStatus(false, "Set get "
+ "execution exception "));
}
if (!setStatus) {
return setOp;
}
try {
observePoll(key, 0x0L, req, rep);
setOp.set(true, setOp.getStatus());
} catch (ObservedException e) {
setOp.set(false, new OperationStatus(false, e.getMessage()));
} catch (ObservedTimeoutException e) {
setOp.set(false, new OperationStatus(false, e.getMessage()));
} catch (ObservedModifiedException e) {
setOp.set(false, new OperationStatus(false, e.getMessage()));
}
return (setOp);
return setOp;
}
/**
* Set a value with Observe.
Expand All @@ -892,6 +912,172 @@ public OperationFuture<Boolean> set(String key, int exp,
String value, PersistTo req) {
return set(key, exp, value, req, ReplicateTo.ZERO);
}
/**
* Add a value and Observe.
*
* @param key the key to set
* @param exp the Expiry value
* @param value the Key value
* @param req the Persistence to Master value
* @param rep the Persistence to Replicas
* @return whether or not the operation was performed
*
*/
public OperationFuture<Boolean> add(String key, int exp,
String value, PersistTo req, ReplicateTo rep) {

OperationFuture<Boolean> addOp = add(key, exp, value);

boolean addStatus = false;

try {
addStatus = addOp.get();
} catch (InterruptedException e) {
addOp.set(false, new OperationStatus(false, "Add get timed out"));
} catch (ExecutionException e) {
addOp.set(false, new OperationStatus(false, "Add get "
+ "execution exception "));
}
if (!addStatus) {
return addOp;
}
try {
observePoll(key, 0x0L, req, rep);
addOp.set(true, addOp.getStatus());
} catch (ObservedException e) {
addOp.set(false, new OperationStatus(false, e.getMessage()));
} catch (ObservedTimeoutException e) {
addOp.set(false, new OperationStatus(false, e.getMessage()));
} catch (ObservedModifiedException e) {
addOp.set(false, new OperationStatus(false, e.getMessage()));
}
return addOp;
}

/**
* Add a value with Observe.
*
* @param key the key to set
* @param exp the Expiry value
* @param value the Key value
* @param req the Persistence to Master value
* @return whether or not the operation was performed
*
*/
public OperationFuture<Boolean> add(String key, int exp,
String value, PersistTo req) {
return add(key, exp, value, req, ReplicateTo.ZERO);
}

/**
* Replace a value and Observe.
*
* @param key the key to set
* @param exp the Expiry value
* @param value the Key value
* @param req the Persistence to Master value
* @param rep the Persistence to Replicas
* @return whether or not the operation was performed
*
*/
public OperationFuture<Boolean> replace(String key, int exp,
String value, PersistTo req, ReplicateTo rep) {

OperationFuture<Boolean> replaceOp = replace(key, exp, value);

boolean replaceStatus = false;

try {
replaceStatus = replaceOp.get();
} catch (InterruptedException e) {
replaceOp.set(false, new OperationStatus(false, "Replace get timed out"));
} catch (ExecutionException e) {
replaceOp.set(false, new OperationStatus(false, "Replace get "
+ "execution exception "));
}
if (!replaceStatus) {
return replaceOp;
}
try {
observePoll(key, 0x0L, req, rep);
replaceOp.set(true, replaceOp.getStatus());
} catch (ObservedException e) {
replaceOp.set(false, new OperationStatus(false, e.getMessage()));
} catch (ObservedTimeoutException e) {
replaceOp.set(false, new OperationStatus(false, e.getMessage()));
} catch (ObservedModifiedException e) {
replaceOp.set(false, new OperationStatus(false, e.getMessage()));
}
return replaceOp;

}
/**
* Replace a value with Observe.
*
* @param key the key to set
* @param exp the Expiry value
* @param value the Key value
* @param req the Persistence to Master value
* @return whether or not the operation was performed
*
*/
public OperationFuture<Boolean> replace(String key, int exp,
String value, PersistTo req) {
return replace(key, exp, value, req, ReplicateTo.ZERO);
}

/**
* Set a value with a CAS and Observe.
*
* @param key the key to set
* @param cas the CAS value
* @param value the Key value
* @param req the Persistence to Master value
* @param rep the Persistence to Replicas
* @return whether or not the operation was performed
*
*/
public CASResponse cas(String key, long cas,
String value, PersistTo req, ReplicateTo rep) {

OperationFuture<CASResponse> casOp = asyncCAS(key, cas, value);
CASResponse casr = null;
try {
casr = casOp.get();
} catch (InterruptedException e) {
casr = CASResponse.EXISTS;
} catch (ExecutionException e) {
casr = CASResponse.EXISTS;
}
if (casr != CASResponse.OK) {
return casr;
}
try {
observePoll(key, casOp.getCas(), req, rep);
} catch (ObservedException e) {
casr = CASResponse.OBSERVE_ERROR_IN_ARGS;
} catch (ObservedTimeoutException e) {
casr = CASResponse.OBSERVE_TIMEOUT;
} catch (ObservedModifiedException e) {
casr = CASResponse.OBSERVE_MODIFIED;
}
return casr;
}
/**
* Set a value with a CAS and Observe.
*
* @param key the key to set
* @param casv the CAS value
* @param value the Key value
* @param req the Persistence to Master value
* @return whether or not the operation was performed
*
*/
public CASResponse cas(String key, long casv,
String value, PersistTo req) {
return cas(key, casv, value, req, ReplicateTo.ZERO);
}

/**
* Observe a key with a CAS.
*
Expand Down Expand Up @@ -1057,7 +1243,7 @@ public boolean shutdown(long timeout, TimeUnit unit) {
}

private void observePoll(String key, long cas,
PersistTo persist, ReplicateTo replicate) throws TimeoutException {
PersistTo persist, ReplicateTo replicate) {
int persists, replicates;
int totPersists, totReplicas;
boolean masterPersisted;
Expand Down Expand Up @@ -1099,7 +1285,7 @@ private void observePoll(String key, long cas,

if (replicates > replicas
|| persists > replicas) {
throw new IllegalArgumentException("Requested Persists and "
throw new ObservedException("Requested Persists and "
+ "Requested Replicates exceed number of replicas = "
+ replicas);
}
Expand All @@ -1118,7 +1304,8 @@ private void observePoll(String key, long cas,
masterPersisted = true;
}
if ((or[0]) == ObserveResponse.MODIFIED) { // Master
throw new RuntimeException("Observe - the key was modified");
throw new ObservedModifiedException("Observe - the key was modified on "
+ key);
}

for (int i=1; i < or.length; i++) {
Expand All @@ -1140,11 +1327,11 @@ private void observePoll(String key, long cas,
Thread.sleep(400);
} catch (InterruptedException e) {
getLogger().error("Interrupted while in observe loop.", e);
throw new ObservedException("Observe was Interrupted ");
}
} while (loop++ < 10);

throw new TimeoutException("Observe - Polled for Maximum retries of "
+ loop);
throw new ObservedTimeoutException("Observe Timeout - Polled"
+ " Unsuccessfully for over 4 seconds");
}

}
13 changes: 13 additions & 0 deletions src/main/java/com/couchbase/client/CouchbaseClientIF.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.util.concurrent.Future;

import net.spy.memcached.CASResponse;
import net.spy.memcached.CASValue;
import net.spy.memcached.MemcachedClientIF;
import net.spy.memcached.ObserveResponse;
Expand Down Expand Up @@ -67,6 +68,18 @@ OperationFuture<Boolean> set(String key, int exp,
String value, PersistTo persist);
OperationFuture<Boolean> set(String key, int exp,
String value, PersistTo persist, ReplicateTo replicate);
OperationFuture<Boolean> add(String key, int exp,
String value, PersistTo persist);
OperationFuture<Boolean> add(String key, int exp,
String value, PersistTo persist, ReplicateTo replicate);
OperationFuture<Boolean> replace(String key, int exp,
String value, PersistTo persist);
OperationFuture<Boolean> replace(String key, int exp,
String value, PersistTo persist, ReplicateTo replicate);
CASResponse cas(String key, long cas,
String value, PersistTo req, ReplicateTo rep);
CASResponse cas(String key, long cas,
String value, PersistTo req);
OperationFuture<Boolean> delete(String key, PersistTo persist);
OperationFuture<Boolean> delete(String key, PersistTo persist,
ReplicateTo replicate);
Expand Down
46 changes: 46 additions & 0 deletions src/main/java/com/couchbase/client/ObservedException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Copyright (C) 2009-2012 Couchbase, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALING
* IN THE SOFTWARE.
*/
package com.couchbase.client;

/**
* An exception in Observe.
*/
public class ObservedException extends RuntimeException {

static final long serialVersionUID = -815352665628228664L;

public ObservedException() {
super();
}

public ObservedException(String message) {
super(message);
}

public ObservedException(String message, Throwable cause) {
super(message, cause);
}

public ObservedException(Throwable cause) {
super(cause);
}
}
Loading

0 comments on commit 4b300b2

Please sign in to comment.