Skip to content

Commit

Permalink
Merge branch 'selector_timeout'
Browse files Browse the repository at this point in the history
  • Loading branch information
Chinmay Soman committed Oct 25, 2011
2 parents 70fcfd8 + f13a9a0 commit f8d4cc2
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 25 deletions.
Expand Up @@ -20,8 +20,8 @@
import voldemort.store.nonblockingstore.NonblockingStoreCallback;
import voldemort.store.routed.BasicPipelineData;
import voldemort.store.routed.Pipeline;
import voldemort.store.routed.Response;
import voldemort.store.routed.Pipeline.Event;
import voldemort.store.routed.Response;
import voldemort.store.slop.HintedHandoff;
import voldemort.store.slop.Slop;
import voldemort.utils.ByteArray;
Expand Down
Expand Up @@ -34,8 +34,8 @@
import voldemort.store.nonblockingstore.NonblockingStoreCallback;
import voldemort.store.routed.GetAllPipelineData;
import voldemort.store.routed.Pipeline;
import voldemort.store.routed.Response;
import voldemort.store.routed.Pipeline.Event;
import voldemort.store.routed.Response;
import voldemort.utils.ByteArray;
import voldemort.versioning.Versioned;

Expand Down
Expand Up @@ -35,9 +35,9 @@
import voldemort.store.nonblockingstore.NonblockingStore;
import voldemort.store.nonblockingstore.NonblockingStoreCallback;
import voldemort.store.routed.Pipeline;
import voldemort.store.routed.Pipeline.Event;
import voldemort.store.routed.PutPipelineData;
import voldemort.store.routed.Response;
import voldemort.store.routed.Pipeline.Event;
import voldemort.store.slop.HintedHandoff;
import voldemort.store.slop.Slop;
import voldemort.utils.ByteArray;
Expand Down
Expand Up @@ -33,9 +33,9 @@
import voldemort.store.nonblockingstore.NonblockingStoreCallback;
import voldemort.store.routed.BasicPipelineData;
import voldemort.store.routed.Pipeline;
import voldemort.store.routed.Response;
import voldemort.store.routed.Pipeline.Event;
import voldemort.store.routed.Pipeline.Operation;
import voldemort.store.routed.Response;
import voldemort.utils.ByteArray;
import voldemort.utils.Utils;

Expand Down
42 changes: 24 additions & 18 deletions src/java/voldemort/store/socket/SocketStore.java
Expand Up @@ -219,7 +219,7 @@ public String getName() {
}

public void close() throws VoldemortException {
// don't close the socket pool, it is shared
// don't close the socket pool, it is shared
}

/**
Expand Down Expand Up @@ -328,29 +328,25 @@ public NonblockingStoreCallbackClientRequest(ClientRequest<T> clientRequest,
this.startNs = System.nanoTime();
}

private void invokeCallback(Object o, long requestTime) {
if(callback != null) {
try {
callback.requestComplete(o, requestTime);
} catch(Exception e) {
if(logger.isEnabledFor(Level.WARN))
logger.warn(e, e);
}
}
}

public void complete() {
try {
clientRequest.complete();
Object result = clientRequest.getResult();

if(callback != null) {
try {
callback.requestComplete(result, (System.nanoTime() - startNs)
/ Time.NS_PER_MS);
} catch(Exception e) {
if(logger.isEnabledFor(Level.WARN))
logger.warn(e, e);
}
}
invokeCallback(result, (System.nanoTime() - startNs) / Time.NS_PER_MS);
} catch(Exception e) {
if(callback != null) {
try {
callback.requestComplete(e, (System.nanoTime() - startNs) / Time.NS_PER_MS);
} catch(Exception ex) {
if(logger.isEnabledFor(Level.WARN))
logger.warn(ex, ex);
}
}
invokeCallback(e, (System.nanoTime() - startNs) / Time.NS_PER_MS);
} finally {
pool.checkin(destination, clientRequestExecutor);
isComplete = true;
Expand All @@ -377,6 +373,16 @@ public void parseResponse(DataInputStream inputStream) {
clientRequest.parseResponse(inputStream);
}

public void timeOut() {
clientRequest.timeOut();
invokeCallback(new UnreachableStoreException("ClientRequestExecutor timed out. Cannot complete request."),
(System.nanoTime() - startNs) / Time.NS_PER_MS);
pool.checkin(destination, clientRequestExecutor);
}

public boolean isTimedOut() {
return clientRequest.isTimedOut();
}
}

}
Expand Up @@ -40,6 +40,8 @@ public abstract class AbstractClientRequest<T> implements ClientRequest<T> {

private volatile boolean isParsed = false;

private volatile boolean isTimedOut = false;

protected abstract void formatRequestInternal(DataOutputStream outputStream) throws IOException;

protected abstract T parseResponseInternal(DataInputStream inputStream) throws IOException;
Expand Down Expand Up @@ -93,4 +95,12 @@ public boolean isComplete() {
return isComplete;
}

public final void timeOut() {
isTimedOut = true;
}

public boolean isTimedOut() {
return isTimedOut;
}

}
Expand Up @@ -77,4 +77,12 @@ public boolean formatRequest(DataOutputStream outputStream) {
return delegate.formatRequest(outputStream);
}

public void timeOut() {
delegate.timeOut();
}

public boolean isTimedOut() {
return delegate.isTimedOut();
}

}
19 changes: 19 additions & 0 deletions src/java/voldemort/store/socket/clientrequest/ClientRequest.java
Expand Up @@ -120,4 +120,23 @@ public interface ClientRequest<T> {

public boolean isComplete();

/**
* Called by the {@link ClientRequestExecutor} after it has timed out. This
* is different from the complete call, since the timeout event needs to be
* notified to the caller in a special way.
* <p/>
*
* This is used internally by the {@link ClientRequest} logic and should not
* be invoked by users of the sub-system.
*/

public void timeOut();

/**
* Returns <code>true</code> if {@link ClientRequestExecutor} timed out.
*
* @return <code>true</code> if timed out, <code>false</code> otherwise
*/

public boolean isTimedOut();
}
Expand Up @@ -28,7 +28,6 @@

import org.apache.log4j.Level;

import voldemort.utils.SelectorManager;
import voldemort.utils.SelectorManagerWorker;
import voldemort.utils.Time;

Expand All @@ -51,11 +50,13 @@ public class ClientRequestExecutor extends SelectorManagerWorker {
private ClientRequest<?> clientRequest;

private long expiration;
private boolean isExpired;

public ClientRequestExecutor(Selector selector,
SocketChannel socketChannel,
int socketBufferSize) {
super(selector, socketChannel, socketBufferSize);
isExpired = false;
}

public SocketChannel getSocketChannel() {
Expand All @@ -80,6 +81,7 @@ public synchronized boolean checkTimeout(SelectionKey selectionKey) {
if(logger.isEnabledFor(Level.WARN))
logger.warn("Client request associated with " + socketChannel.socket() + " timed out");

isExpired = true;
close();

return false;
Expand All @@ -98,7 +100,6 @@ public synchronized void addClientRequest(ClientRequest<?> clientRequest, long t
if(timeoutMs == -1) {
this.expiration = -1;
} else {
timeoutMs -= SelectorManager.SELECTOR_POLL_MS;
this.expiration = System.nanoTime() + (Time.NS_PER_MS * timeoutMs);

if(this.expiration < System.nanoTime())
Expand Down Expand Up @@ -266,7 +267,10 @@ private synchronized void completeClientRequest() {
clientRequest = null;
expiration = 0;

local.complete();
if(isExpired)
local.timeOut();
else
local.complete();

if(logger.isTraceEnabled())
logger.trace("Marked client associated with " + socketChannel.socket() + " as complete");
Expand Down
51 changes: 51 additions & 0 deletions test/unit/voldemort/store/routed/RoutedStoreTest.java
Expand Up @@ -1031,6 +1031,57 @@ public void testPutTimeout() throws Exception {
}
}

@Test
public void testGetTimeout() throws Exception {
int timeout = 50;
StoreDefinition definition = new StoreDefinitionBuilder().setName("test")
.setType("foo")
.setKeySerializer(new SerializerDefinition("test"))
.setValueSerializer(new SerializerDefinition("test"))
.setRoutingPolicy(RoutingTier.CLIENT)
.setRoutingStrategyType(RoutingStrategyType.CONSISTENT_STRATEGY)
.setReplicationFactor(3)
.setPreferredReads(3)
.setRequiredReads(3)
.setPreferredWrites(3)
.setRequiredWrites(3)
.build();
Map<Integer, Store<ByteArray, byte[], byte[]>> stores = new HashMap<Integer, Store<ByteArray, byte[], byte[]>>();
List<Node> nodes = new ArrayList<Node>();
int totalDelay = 0;
for(int i = 0; i < 3; i++) {
int delay = 4 + i * timeout;
totalDelay += delay;
Store<ByteArray, byte[], byte[]> store = new SleepyStore<ByteArray, byte[], byte[]>(delay,
new InMemoryStorageEngine<ByteArray, byte[], byte[]>("test"));
stores.put(i, store);
List<Integer> partitions = Arrays.asList(i);
nodes.add(new Node(i, "none", 0, 0, 0, partitions));
}

setFailureDetector(stores);

routedStoreThreadPool = Executors.newFixedThreadPool(3);
RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(true,
routedStoreThreadPool,
timeout);

RoutedStore routedStore = routedStoreFactory.create(new Cluster("test", nodes),
definition,
stores,
true,
failureDetector);

long start = System.nanoTime();
try {
routedStore.get(new ByteArray("test".getBytes()), null);
fail("Should have thrown");
} catch(InsufficientOperationalNodesException e) {
long elapsed = (System.nanoTime() - start) / Time.NS_PER_MS;
assertTrue(elapsed + " < " + totalDelay, elapsed < totalDelay);
}
}

/**
* See Issue #211: Unnecessary read repairs during getAll with more than one
* key
Expand Down

0 comments on commit f8d4cc2

Please sign in to comment.