Skip to content

Commit

Permalink
Addressed most of the feedback from the code review.
Browse files Browse the repository at this point in the history
- Renamed many variables, methods & classes
- Addressed most of the TODOs in my changes based on the feedback
  • Loading branch information
jayjwylie committed Oct 9, 2012
1 parent 714cd46 commit 4b05dcf
Show file tree
Hide file tree
Showing 14 changed files with 158 additions and 123 deletions.
2 changes: 1 addition & 1 deletion src/java/voldemort/client/TimeoutConfig.java
@@ -1,6 +1,6 @@
package voldemort.client;

import voldemort.utils.OpTimeMap;
import voldemort.common.OpTimeMap;

/**
* Encapsulates the timeouts, in ms, for various Voldemort operations
Expand Down
2 changes: 1 addition & 1 deletion src/java/voldemort/client/protocol/admin/SocketPool.java
Expand Up @@ -126,7 +126,7 @@ public void checkin(SocketDestination destination, SocketAndStreams socket) {

public void close(SocketDestination destination) {
socketFactory.setLastClosedTimestamp(destination);
pool.close(destination);
pool.reset(destination);
}

/**
Expand Down
@@ -1,11 +1,10 @@
package voldemort.utils;
package voldemort.common;

import java.util.HashMap;

import voldemort.common.VoldemortOpCode;

/**
* Encapsulates time to operation mapping
* Encapsulates time to Voldemort operation mapping
*
*/
public class OpTimeMap {
Expand Down
2 changes: 1 addition & 1 deletion src/java/voldemort/server/VoldemortConfig.java
Expand Up @@ -25,6 +25,7 @@
import voldemort.client.TimeoutConfig;
import voldemort.client.protocol.RequestFormatType;
import voldemort.cluster.failuredetector.FailureDetectorConfig;
import voldemort.common.OpTimeMap;
import voldemort.common.VoldemortOpCode;
import voldemort.server.scheduler.slop.StreamingSlopPusherJob;
import voldemort.store.bdb.BdbStorageConfiguration;
Expand All @@ -34,7 +35,6 @@
import voldemort.store.readonly.BinarySearchStrategy;
import voldemort.store.readonly.ReadOnlyStorageConfiguration;
import voldemort.utils.ConfigurationException;
import voldemort.utils.OpTimeMap;
import voldemort.utils.Props;
import voldemort.utils.Time;
import voldemort.utils.UndefinedPropertyException;
Expand Down
Expand Up @@ -40,15 +40,15 @@
import voldemort.utils.JmxUtils;
import voldemort.utils.Time;
import voldemort.utils.Utils;
import voldemort.utils.pool.KeyedResourcePool;
import voldemort.utils.pool.AsyncResourceRequest;
import voldemort.utils.pool.QueuedKeyedResourcePool;
import voldemort.utils.pool.ResourcePoolConfig;
import voldemort.utils.pool.ResourceRequest;

/**
* A pool of {@link ClientRequestExecutor} keyed off the
* {@link SocketDestination}. This is a wrapper around {@link KeyedResourcePool}
* that translates exceptions as well as providing some JMX access.
* {@link SocketDestination}. This is a wrapper around
* {@link QueuedKeyedResourcePool} that translates exceptions, provides some JMX
* access, and handles asynchronous requests for SocketDestinations.
*
* <p/>
*
Expand All @@ -58,7 +58,7 @@

public class ClientRequestExecutorPool implements SocketStoreFactory {

private final QueuedKeyedResourcePool<SocketDestination, ClientRequestExecutor> pool;
private final QueuedKeyedResourcePool<SocketDestination, ClientRequestExecutor> queuedPool;
private final ClientRequestExecutorFactory factory;
private final ClientSocketStats stats;
private final boolean jmxEnabled;
Expand Down Expand Up @@ -96,10 +96,10 @@ public ClientRequestExecutorPool(int selectors,
socketBufferSize,
socketKeepAlive,
stats);
this.pool = new QueuedKeyedResourcePool<SocketDestination, ClientRequestExecutor>(factory,
config);
this.queuedPool = new QueuedKeyedResourcePool<SocketDestination, ClientRequestExecutor>(factory,
config);
if(stats != null) {
this.stats.setPool(pool);
this.stats.setPool(queuedPool);
}
}

Expand Down Expand Up @@ -159,7 +159,7 @@ public ClientRequestExecutor checkout(SocketDestination destination) {
long start = System.nanoTime();
ClientRequestExecutor clientRequestExecutor;
try {
clientRequestExecutor = pool.checkout(destination);
clientRequestExecutor = queuedPool.checkout(destination);
} catch(Exception e) {
throw new UnreachableStoreException("Failure while checking out socket for "
+ destination + ": ", e);
Expand All @@ -180,7 +180,7 @@ public ClientRequestExecutor checkout(SocketDestination destination) {
*/
public void checkin(SocketDestination destination, ClientRequestExecutor clientRequestExecutor) {
try {
pool.checkin(destination, clientRequestExecutor);
queuedPool.checkin(destination, clientRequestExecutor);
} catch(Exception e) {
throw new VoldemortException("Failure while checking in socket for " + destination
+ ": ", e);
Expand All @@ -189,7 +189,7 @@ public void checkin(SocketDestination destination, ClientRequestExecutor clientR

public void close(SocketDestination destination) {
factory.setLastClosedTimestamp(destination);
pool.close(destination);
queuedPool.reset(destination);
}

/**
Expand All @@ -207,7 +207,7 @@ public void close() {
stats.close();
}
factory.close();
pool.close();
queuedPool.close();
}

public ClientSocketStats getStats() {
Expand All @@ -219,20 +219,21 @@ public <T> void submitAsync(SocketDestination destination,
NonblockingStoreCallback callback,
long timeoutMs,
String operationName) {
AsyncRequest<T> asyncRequest = new AsyncRequest<T>(destination,
delegate,
callback,
timeoutMs,
operationName);
pool.requestResource(destination, asyncRequest);
AsyncSocketDestinationRequest<T> asyncSocketDestinationRequest = new AsyncSocketDestinationRequest<T>(destination,
delegate,
callback,
timeoutMs,
operationName);
queuedPool.registerResourceRequest(destination, asyncSocketDestinationRequest);
return;
}

/**
* Wrap up an asynchronous request and actually issue it once a
* SocketDestination is checked out.
*/
private class AsyncRequest<T> implements ResourceRequest<ClientRequestExecutor> {
private class AsyncSocketDestinationRequest<T> implements
AsyncResourceRequest<ClientRequestExecutor> {

private final SocketDestination destination;
public final ClientRequest<T> delegate;
Expand All @@ -242,11 +243,11 @@ private class AsyncRequest<T> implements ResourceRequest<ClientRequestExecutor>

private final long startTimeNs;

public AsyncRequest(SocketDestination destination,
ClientRequest<T> delegate,
NonblockingStoreCallback callback,
long timeoutMs,
String operationName) {
public AsyncSocketDestinationRequest(SocketDestination destination,
ClientRequest<T> delegate,
NonblockingStoreCallback callback,
long timeoutMs,
String operationName) {
this.destination = destination;
this.delegate = delegate;
this.callback = callback;
Expand All @@ -263,8 +264,10 @@ public void useResource(ClientRequestExecutor clientRequestExecutor) {
+ " requestRef: "
+ System.identityHashCode(delegate)
+ " time: "
// TODO: output startTimeNs instead?
+ System.currentTimeMillis()
// Output time (ms) includes queueing delay (i.e.,
// time between when registerResourceRequest is
// called and time when useResource is invoked).
+ (this.startTimeNs / Time.NS_PER_MS)
+ " server: "
+ clientRequestExecutor.getSocketChannel()
.socket()
Expand Down
9 changes: 4 additions & 5 deletions src/java/voldemort/store/stats/ClientSocketStats.java
Expand Up @@ -25,7 +25,7 @@
import voldemort.store.socket.SocketDestination;
import voldemort.store.socket.clientrequest.ClientRequestExecutor;
import voldemort.utils.JmxUtils;
import voldemort.utils.pool.KeyedResourcePool;
import voldemort.utils.pool.QueuedKeyedResourcePool;

/**
* Some convenient statistics to track about the client requests
Expand All @@ -37,7 +37,7 @@ public class ClientSocketStats {
private final ClientSocketStats parent;
private final ConcurrentMap<SocketDestination, ClientSocketStats> statsMap;
private final SocketDestination destination;
private KeyedResourcePool<SocketDestination, ClientRequestExecutor> pool;
private QueuedKeyedResourcePool<SocketDestination, ClientRequestExecutor> pool;

private final AtomicInteger monitoringInterval = new AtomicInteger(10000);
private final Histogram checkoutTimeUsHistogram = new Histogram(20000, 100);
Expand All @@ -59,8 +59,7 @@ public class ClientSocketStats {
*/
public ClientSocketStats(ClientSocketStats parent,
SocketDestination destination,
KeyedResourcePool<SocketDestination, ClientRequestExecutor> pool,
int jmxId) {
QueuedKeyedResourcePool<SocketDestination, ClientRequestExecutor> pool, int jmxId) {
this.parent = parent;
this.statsMap = null;
this.destination = destination;
Expand Down Expand Up @@ -212,7 +211,7 @@ public int getMonitoringInterval() {
return this.monitoringInterval.get();
}

public void setPool(KeyedResourcePool<SocketDestination, ClientRequestExecutor> pool) {
public void setPool(QueuedKeyedResourcePool<SocketDestination, ClientRequestExecutor> pool) {
this.pool = pool;
}

Expand Down
Expand Up @@ -5,7 +5,7 @@
* useResource, handleTimeout, or handleException expected to be invoked within
* deadline specified by getDeadlineNs.
*/
public interface ResourceRequest<V> {
public interface AsyncResourceRequest<V> {

/**
* To be invoked with resource to use.
Expand Down
21 changes: 10 additions & 11 deletions src/java/voldemort/utils/pool/KeyedResourcePool.java
Expand Up @@ -164,9 +164,11 @@ protected boolean attemptGrow(K key, Pool<V> pool) throws Exception {
protected Pool<V> getResourcePoolForKey(K key) {
Pool<V> resourcePool = resourcePoolMap.get(key);
if(resourcePool == null) {
resourcePool = new Pool<V>(this.resourcePoolConfig);
resourcePoolMap.putIfAbsent(key, resourcePool);
resourcePool = resourcePoolMap.get(key);
Pool<V> newResourcePool = new Pool<V>(this.resourcePoolConfig);
resourcePool = resourcePoolMap.putIfAbsent(key, newResourcePool);
if(resourcePool == null) {
resourcePool = newResourcePool;
}
}
return resourcePool;
}
Expand Down Expand Up @@ -245,16 +247,15 @@ public void close() {
}

/**
* "Close" a specific resource pool by destroying all the resources in the
* pool. This method does not affect whether any pool is "open" in the sense
* of permitting new resources to be added to it.
* Reset a specific resource pool. Destroys all the resources in the pool.
* This method does not affect whether the pool is "open" in the sense of
* permitting new resources to be added to it.
*
* @param key The key for the pool to close.
* @param key The key for the pool to reset.
*/
public void close(K key) {
public void reset(K key) {
Pool<V> resourcePool = getResourcePoolForExistingKey(key);
List<V> list = resourcePool.close();
// destroy each resource currently in the pool
for(V value: list)
destroyResource(key, resourcePool, value);
}
Expand Down Expand Up @@ -362,8 +363,6 @@ public <K> boolean attemptGrow(K key, ResourceFactory<K, V> objectFactory) throw
if(resource != null) {
if(!nonBlockingPut(resource)) {
this.size.decrementAndGet();
// TODO: Do we need to destroy the non-null,
// non-enqueued resource?
objectFactory.destroy(key, resource);
return false;
}
Expand Down

0 comments on commit 4b05dcf

Please sign in to comment.