Skip to content

Commit

Permalink
SPY-112: Throw CancellationException when future is cancelled.
Browse files Browse the repository at this point in the history
This changeset throws the semantically more correct
CancellationException when a operation future is cancelled. Old
code that catches RuntimeExceptions will still work, but it allows
for more flexible catching on the application level.

Change-Id: I476f06e8989f06423be6e186c6565179e270df13
Reviewed-on: http://review.couchbase.org/24654
Reviewed-by: Matt Ingenthron <matt@couchbase.com>
Tested-by: Michael Nitschinger <michael.nitschinger@couchbase.com>
  • Loading branch information
daschl authored and Michael Nitschinger committed Feb 21, 2013
1 parent 781cb7e commit 84ebf9a
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 9 deletions.
44 changes: 37 additions & 7 deletions src/main/java/net/spy/memcached/MemcachedClient.java
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -558,6 +559,7 @@ public <T> CASResponse cas(String key, long casId, T value,
* @param tc the transcoder to serialize and unserialize the value
* @return a CASResponse
* @throws OperationTimeoutException if global operation timeout is exceeded
* @throws CancellationException if operation was canceled
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
Expand All @@ -573,7 +575,11 @@ public <T> CASResponse cas(String key, long casId, int exp, T value,
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted waiting for value", e);
} catch (ExecutionException e) {
throw new RuntimeException("Exception waiting for value", e);
if(e.getCause() instanceof CancellationException) {
throw (CancellationException) e.getCause();
} else {
throw new RuntimeException("Exception waiting for value", e);
}
} catch (TimeoutException e) {
throw new OperationTimeoutException("Timeout waiting for value", e);
}
Expand Down Expand Up @@ -922,6 +928,7 @@ public OperationFuture<CASValue<Object>> asyncGets(final String key) {
* @param tc the transcoder to serialize and unserialize value
* @return the result from the cache and CAS id (null if there is none)
* @throws OperationTimeoutException if global operation timeout is exceeded
* @throws CancellationException if operation was canceled
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
Expand All @@ -931,7 +938,11 @@ public <T> CASValue<T> gets(String key, Transcoder<T> tc) {
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted waiting for value", e);
} catch (ExecutionException e) {
throw new RuntimeException("Exception waiting for value", e);
if(e.getCause() instanceof CancellationException) {
throw (CancellationException) e.getCause();
} else {
throw new RuntimeException("Exception waiting for value", e);
}
} catch (TimeoutException e) {
throw new OperationTimeoutException("Timeout waiting for value", e);
}
Expand All @@ -947,6 +958,7 @@ public <T> CASValue<T> gets(String key, Transcoder<T> tc) {
* @return the result from the cache (null if there is none)
* @throws OperationTimeoutException if the global operation timeout is
* exceeded
* @throws CancellationException if operation was canceled
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
Expand All @@ -957,7 +969,11 @@ public <T> CASValue<T> getAndTouch(String key, int exp, Transcoder<T> tc) {
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted waiting for value", e);
} catch (ExecutionException e) {
throw new RuntimeException("Exception waiting for value", e);
if(e.getCause() instanceof CancellationException) {
throw (CancellationException) e.getCause();
} else {
throw new RuntimeException("Exception waiting for value", e);
}
} catch (TimeoutException e) {
throw new OperationTimeoutException("Timeout waiting for value", e);
}
Expand Down Expand Up @@ -1001,6 +1017,7 @@ public CASValue<Object> gets(String key) {
* @return the result from the cache (null if there is none)
* @throws OperationTimeoutException if the global operation timeout is
* exceeded
* @throws CancellationException if operation was canceled
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
Expand All @@ -1010,7 +1027,11 @@ public <T> T get(String key, Transcoder<T> tc) {
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted waiting for value", e);
} catch (ExecutionException e) {
throw new RuntimeException("Exception waiting for value", e);
if(e.getCause() instanceof CancellationException) {
throw (CancellationException) e.getCause();
} else {
throw new RuntimeException("Exception waiting for value", e);
}
} catch (TimeoutException e) {
throw new OperationTimeoutException("Timeout waiting for value", e);
}
Expand Down Expand Up @@ -1294,6 +1315,7 @@ public void gotData(String k, int flags, long cas, byte[] data) {
* @return a map of the values (for each value that exists)
* @throws OperationTimeoutException if the global operation timeout is
* exceeded
* @throws CancellationException if operation was canceled
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
Expand All @@ -1305,9 +1327,13 @@ public <T> Map<String, T> getBulk(Iterator<String> keyIter,
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted getting bulk values", e);
} catch (ExecutionException e) {
throw new RuntimeException("Failed getting bulk values", e);
if(e.getCause() instanceof CancellationException) {
throw (CancellationException) e.getCause();
} else {
throw new RuntimeException("Exception waiting for bulk values", e);
}
} catch (TimeoutException e) {
throw new OperationTimeoutException("Timeout waiting for bulkvalues", e);
throw new OperationTimeoutException("Timeout waiting for bulk values", e);
}
}

Expand Down Expand Up @@ -1681,7 +1707,11 @@ private long mutateWithDefault(Mutator t, String key, long by, long def,
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted waiting for store", e);
} catch (ExecutionException e) {
throw new RuntimeException("Failed waiting for store", e);
if(e.getCause() instanceof CancellationException) {
throw (CancellationException) e.getCause();
} else {
throw new RuntimeException("Failed waiting for store", e);
}
} catch (TimeoutException e) {
throw new OperationTimeoutException("Timeout waiting to mutate or init"
+ " value", e);
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/net/spy/memcached/internal/BulkGetFuture.java
Expand Up @@ -27,6 +27,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -149,7 +150,7 @@ private Map<String, T> internalGet(long to, TimeUnit unit,
}
for (Operation op : ops) {
if (op.isCancelled()) {
throw new ExecutionException(new RuntimeException("Cancelled"));
throw new ExecutionException(new CancellationException("Cancelled"));
}
if (op.hasErrored()) {
throw new ExecutionException(op.getException());
Expand Down
Expand Up @@ -23,6 +23,7 @@

package net.spy.memcached.internal;

import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -166,7 +167,7 @@ public T get(long duration, TimeUnit units) throws InterruptedException,
throw new ExecutionException(op.getException());
}
if (isCancelled()) {
throw new ExecutionException(new RuntimeException("Cancelled"));
throw new ExecutionException(new CancellationException("Cancelled"));
}
if (op != null && op.isTimedOut()) {
throw new ExecutionException(new CheckedOperationTimeoutException(
Expand Down
2 changes: 2 additions & 0 deletions src/test/java/net/spy/memcached/CancellationBaseCase.java
Expand Up @@ -26,6 +26,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -57,6 +58,7 @@ private void tryCancellation(Future<?> f) throws Exception {
fail("Expected cancellation, got " + o);
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof RuntimeException);
assertTrue(e.getCause() instanceof CancellationException);
assertEquals("Cancelled", e.getCause().getMessage());
}
}
Expand Down

0 comments on commit 84ebf9a

Please sign in to comment.