Skip to content

Commit

Permalink
Moved operation timeout into the connection factory.
Browse files Browse the repository at this point in the history
It shouldn't change at runtime in a client, as the current model was
not thread safe.  In order for this to be dynamically reconfigurable,
the field either needs to be volatile or access to the value must be
synchronized.

By having the value be declared final and removing the mutators and
accessors, we can guarantee correctness.
  • Loading branch information
dustin committed May 3, 2008
1 parent 33bcb1c commit 4bcd22c
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 45 deletions.
5 changes: 5 additions & 0 deletions src/main/java/net/spy/memcached/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,9 @@ MemcachedNode createMemcachedNode(SocketAddress sa,
*/
OperationFactory getOperationFactory();

/**
* Get the operation timeout used by this connection.
*/
long getOperationTimeout();

}
15 changes: 15 additions & 0 deletions src/main/java/net/spy/memcached/DefaultConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ public class DefaultConnectionFactory extends SpyObject
*/
public static final int DEFAULT_READ_BUFFER_SIZE=16384;

/**
* Default operation timeout in milliseconds.
*/
public static final long DEFAULT_OPERATION_TIMEOUT = 1000;

private final int opQueueLen;
private final int readBufSize;
private final HashAlgorithm hashAlg;
Expand Down Expand Up @@ -119,8 +124,18 @@ public HashAlgorithm getHashAlg() {
return hashAlg;
}

/* (non-Javadoc)
* @see net.spy.memcached.ConnectionFactory#getOperationFactory()
*/
public OperationFactory getOperationFactory() {
return new AsciiOperationFactory();
}

/* (non-Javadoc)
* @see net.spy.memcached.ConnectionFactory#getOperationTimeout()
*/
public long getOperationTimeout() {
return DEFAULT_OPERATION_TIMEOUT;
}

}
56 changes: 20 additions & 36 deletions src/main/java/net/spy/memcached/MemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,10 @@ public final class MemcachedClient extends SpyThread {
*/
public static final int MAX_KEY_LENGTH = 250;

/**
* Default operation timeout in milliseconds.
*/
public static final long DEFAULT_OPERATION_TIMEOUT = 1000;

private volatile boolean running=true;
private volatile boolean shuttingDown=false;

private long globalOperationTimeout = DEFAULT_OPERATION_TIMEOUT;
private final long operationTimeout;

private final MemcachedConnection conn;
final OperationFactory opFact;
Expand Down Expand Up @@ -147,11 +142,16 @@ public MemcachedClient(ConnectionFactory cf, List<InetSocketAddress> addrs)
throw new IllegalArgumentException(
"You must have at least one server to connect to");
}
if(cf.getOperationTimeout() <= 0) {
throw new IllegalArgumentException(
"Operation timeout must be positive.");
}
transcoder=new SerializingTranscoder();
opFact=cf.getOperationFactory();
assert opFact != null : "Connection factory failed to make op factory";
conn=cf.createConnection(addrs);
assert conn != null : "Connection factory failed to make a connection";
operationTimeout = cf.getOperationTimeout();
setName("Memcached IO over " + conn);
start();
}
Expand All @@ -174,22 +174,6 @@ public Transcoder<Object> getTranscoder() {
return transcoder;
}

/**
* Gets the global operation timeout
* @return long Timeout in milliseconds
*/
public long getGlobalOperationTimeout() {
return globalOperationTimeout;
}

/**
* Sets the global operation timeout
* @param globalOperationTimeout long Timeout in milliseconds
*/
public void setGlobalOperationTimeout(long globalOperationTimeout) {
this.globalOperationTimeout = globalOperationTimeout;
}

private void validateKey(String key) {
byte[] keyBytes=KeyUtil.getKeyBytes(key);
if(keyBytes.length > MAX_KEY_LENGTH) {
Expand Down Expand Up @@ -244,7 +228,7 @@ private <T> Future<Boolean> asyncStore(StoreType storeType, String key,
CachedData co=tc.encode(value);
final CountDownLatch latch=new CountDownLatch(1);
final OperationFuture<Boolean> rv=new OperationFuture<Boolean>(latch,
globalOperationTimeout);
operationTimeout);
Operation op=opFact.store(storeType, key, co.getFlags(),
exp, co.getData(), new OperationCallback() {
public void receivedStatus(OperationStatus val) {
Expand Down Expand Up @@ -277,7 +261,7 @@ public <T> Future<CASResponse> asyncCAS(String key, long casId, T value,
CachedData co=tc.encode(value);
final CountDownLatch latch=new CountDownLatch(1);
final OperationFuture<CASResponse> rv=new OperationFuture<CASResponse>(
latch, globalOperationTimeout);
latch, operationTimeout);
Operation op=opFact.cas(key, casId, co.getFlags(),
co.getData(), new OperationCallback() {
public void receivedStatus(OperationStatus val) {
Expand Down Expand Up @@ -323,7 +307,7 @@ public Future<CASResponse> asyncCAS(String key, long casId, Object value) {
public <T> CASResponse cas(String key, long casId, T value,
Transcoder<T> tc) throws OperationTimeoutException {
try {
return asyncCAS(key, casId, value, tc).get(globalOperationTimeout,
return asyncCAS(key, casId, value, tc).get(operationTimeout,
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted waiting for value", e);
Expand Down Expand Up @@ -543,7 +527,7 @@ public <T> Future<T> asyncGet(final String key, final Transcoder<T> tc) {

final CountDownLatch latch=new CountDownLatch(1);
final OperationFuture<T> rv=new OperationFuture<T>(latch,
globalOperationTimeout);
operationTimeout);

Operation op=opFact.get(key,
new GetOperation.Callback() {
Expand Down Expand Up @@ -586,7 +570,7 @@ public <T> Future<CASValue<T>> asyncGets(final String key,

final CountDownLatch latch=new CountDownLatch(1);
final OperationFuture<CASValue<T>> rv=
new OperationFuture<CASValue<T>>(latch, globalOperationTimeout);
new OperationFuture<CASValue<T>>(latch, operationTimeout);

Operation op=opFact.gets(key,
new GetsOperation.Callback() {
Expand Down Expand Up @@ -631,7 +615,7 @@ public <T> CASValue<T> gets(String key, Transcoder<T> tc)
throws OperationTimeoutException {
try {
return asyncGets(key, tc).get(
globalOperationTimeout, TimeUnit.MILLISECONDS);
operationTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted waiting for value", e);
} catch (ExecutionException e) {
Expand Down Expand Up @@ -666,7 +650,7 @@ public <T> T get(String key, Transcoder<T> tc)
throws OperationTimeoutException {
try {
return asyncGet(key, tc).get(
globalOperationTimeout, TimeUnit.MILLISECONDS);
operationTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted waiting for value", e);
} catch (ExecutionException e) {
Expand Down Expand Up @@ -815,7 +799,7 @@ public <T> Map<String, T> getBulk(Collection<String> keys, Transcoder<T> tc)
throws OperationTimeoutException {
try {
return asyncGetBulk(keys, tc).get(
globalOperationTimeout, TimeUnit.MILLISECONDS);
operationTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted getting bulk values", e);
} catch (ExecutionException e) {
Expand Down Expand Up @@ -888,7 +872,7 @@ public void complete() {
});
}});
try {
blatch.await(globalOperationTimeout, TimeUnit.MILLISECONDS);
blatch.await(operationTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted waiting for versions", e);
}
Expand Down Expand Up @@ -928,7 +912,7 @@ public void complete() {
}});
}});
try {
blatch.await(globalOperationTimeout, TimeUnit.MILLISECONDS);
blatch.await(operationTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted waiting for stats", e);
}
Expand All @@ -950,7 +934,7 @@ public void complete() {
latch.countDown();
}}));
try {
if (!latch.await(globalOperationTimeout, TimeUnit.MILLISECONDS)) {
if (!latch.await(operationTimeout, TimeUnit.MILLISECONDS)) {
throw new OperationTimeoutException(
"Mutate operation timed out, unable to modify counter ["
+ key + "]");
Expand Down Expand Up @@ -997,7 +981,7 @@ private long mutateWithDefault(Mutator t, String key,
Future<Boolean> f=asyncStore(StoreType.add,
key, 0, String.valueOf(def));
try {
if(f.get(globalOperationTimeout, TimeUnit.MILLISECONDS)) {
if(f.get(operationTimeout, TimeUnit.MILLISECONDS)) {
rv=def;
} else {
rv=mutate(t, key, by, 0, 0);
Expand Down Expand Up @@ -1054,7 +1038,7 @@ public long decr(String key, int by, long def)
public Future<Boolean> delete(String key, int when) {
final CountDownLatch latch=new CountDownLatch(1);
final OperationFuture<Boolean> rv=new OperationFuture<Boolean>(latch,
globalOperationTimeout);
operationTimeout);
DeleteOperation op=opFact.delete(key, when,
new OperationCallback() {
public void receivedStatus(OperationStatus s) {
Expand Down Expand Up @@ -1097,7 +1081,7 @@ public void complete() {
return op;
}});
return new OperationFuture<Boolean>(blatch, flushResult,
globalOperationTimeout) {
operationTimeout) {
@Override
public boolean cancel(boolean ign) {
boolean rv=false;
Expand Down
15 changes: 6 additions & 9 deletions src/test/java/net/spy/memcached/LongClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,6 @@
*/
public class LongClientTest extends ClientBaseCase {

@Override
protected void initClient(ConnectionFactory cf) throws Exception {
// TODO Auto-generated method stub
super.initClient(cf);
// This test gets pretty slow in cobertura
client.setGlobalOperationTimeout(15000);
}

public void testParallelGet() throws Throwable {
// Get a connection with the get optimization disabled.
client.shutdown();
Expand All @@ -36,7 +28,12 @@ public MemcachedConnection createConnection(
MemcachedConnection rv = super.createConnection(addrs);
rv.setGetOptimization(false);
return rv;
}});
}
@Override
public long getOperationTimeout() {
return 15000;
}
});

// Throw in some seed data.
byte data[]=new byte[32768];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,34 @@ public void testNullFactoryConstructor() throws Exception {
}
}

public void testNegativeTimeout() throws Exception {
try {
client = new MemcachedClient(new DefaultConnectionFactory() {
@Override
public long getOperationTimeout() {
return -1;
}},
AddrUtil.getAddresses("127.0.0.1:11211"));
fail("Expected null pointer exception, got " + client);
} catch(IllegalArgumentException e) {
assertEquals("Operation timeout must be positive.", e.getMessage());
}
}

public void testZeroTimeout() throws Exception {
try {
client = new MemcachedClient(new DefaultConnectionFactory() {
@Override
public long getOperationTimeout() {
return 0;
}},
AddrUtil.getAddresses("127.0.0.1:11211"));
fail("Expected null pointer exception, got " + client);
} catch(IllegalArgumentException e) {
assertEquals("Operation timeout must be positive.", e.getMessage());
}
}

public void testConnFactoryWithoutOpFactory() throws Exception {
try {
client = new MemcachedClient(new DefaultConnectionFactory(){
Expand Down

0 comments on commit 4bcd22c

Please sign in to comment.