Permalink
Browse files

Merging changes from release-0901hb12

  • Loading branch information...
1 parent e8b6aba commit 44315a9beb70f0963a3cd7887b441f4379072866 @ctasada committed Dec 11, 2012
@@ -292,15 +292,15 @@ public void close() throws VoldemortException {
return blockingClientRequest.getResult();
} catch(InterruptedException e) {
- clientRequestExecutor.close();
+ pool.getFactory().destroy(destination, clientRequestExecutor);
if(logger.isDebugEnabled())
debugMsgStr += "unreachable: " + e.getMessage();
throw new UnreachableStoreException("Failure in " + operationName + " on "
+ destination + ": " + e.getMessage(), e);
} catch(IOException e) {
- clientRequestExecutor.close();
+ pool.getFactory().destroy(destination, clientRequestExecutor);
if(logger.isDebugEnabled())
debugMsgStr += "failure: " + e.getMessage();
@@ -310,7 +310,7 @@ public void close() throws VoldemortException {
} finally {
if(blockingClientRequest != null && !blockingClientRequest.isComplete()) {
// close the executor if we timed out
- clientRequestExecutor.close();
+ pool.getFactory().destroy(destination, clientRequestExecutor);
}
if(logger.isDebugEnabled()) {
@@ -445,8 +445,7 @@ private void invokeCallback(Object o, long requestTime) {
+ ":"
+ clientRequestExecutor.getSocketChannel()
.socket()
- .getLocalPort() + " result: "
- + o);
+ .getLocalPort() + " result: " + o);
}
callback.requestComplete(o, requestTime);
@@ -94,8 +94,7 @@ public ClientRequestExecutorFactory(int selectors,
* Close the ClientRequestExecutor.
*/
- public void destroy(SocketDestination dest, ClientRequestExecutor clientRequestExecutor)
- throws Exception {
+ public void destroy(SocketDestination dest, ClientRequestExecutor clientRequestExecutor) {
clientRequestExecutor.close();
int numDestroyed = destroyed.incrementAndGet();
if(stats != null) {
@@ -120,7 +119,11 @@ public ClientRequestExecutor create(SocketDestination dest) throws Exception {
+ dest.getPort() + " using protocol "
+ dest.getRequestFormatType().getCode());
- SocketChannel socketChannel = SocketChannel.open();
+ SocketChannel socketChannel = null;
+ ClientRequestExecutor clientRequestExecutor = null;
+
+ try {
+ socketChannel = SocketChannel.open();
socketChannel.socket().setReceiveBufferSize(this.socketBufferSize);
socketChannel.socket().setSendBufferSize(this.socketBufferSize);
socketChannel.socket().setTcpNoDelay(true);
@@ -141,23 +144,15 @@ public ClientRequestExecutor create(SocketDestination dest) throws Exception {
long remaining = this.connectTimeoutMs - duration;
if(remaining < 0) {
- // Don't forget to close the socket before we throw our
- // exception or they'll leak :(
- try {
- socketChannel.close();
- } catch(Exception e) {
- if(logger.isEnabledFor(Level.WARN))
- logger.warn(e, e);
- }
-
throw new ConnectException("Cannot connect socket " + numCreated + " for "
+ dest.getHost() + ":" + dest.getPort() + " after "
+ duration + " ms");
}
if(logger.isTraceEnabled())
- logger.trace("Still creating socket " + numCreated + " for " + dest.getHost() + ":"
- + dest.getPort() + ", " + remaining + " ms. remaining to connect");
+ logger.trace("Still creating socket " + numCreated + " for " + dest.getHost()
+ + ":" + dest.getPort() + ", " + remaining
+ + " ms. remaining to connect");
try {
// Break up the connection timeout into smaller units,
@@ -174,7 +169,8 @@ public ClientRequestExecutor create(SocketDestination dest) throws Exception {
if(logger.isDebugEnabled())
logger.debug("Created socket " + numCreated + " for " + dest.getHost() + ":"
+ dest.getPort() + " using protocol "
- + dest.getRequestFormatType().getCode() + " after " + duration + " ms.");
+ + dest.getRequestFormatType().getCode() + " after " + duration
+ + " ms.");
// check buffer sizes--you often don't get out what you put in!
if(socketChannel.socket().getReceiveBufferSize() != this.socketBufferSize)
@@ -191,7 +187,7 @@ public ClientRequestExecutor create(SocketDestination dest) throws Exception {
% selectorManagers.length];
Selector selector = selectorManager.getSelector();
- ClientRequestExecutor clientRequestExecutor = new ClientRequestExecutor(selector,
+ clientRequestExecutor = new ClientRequestExecutor(selector,
socketChannel,
socketBufferSize);
BlockingClientRequest<String> clientRequest = new BlockingClientRequest<String>(new ProtocolNegotiatorClientRequest(dest.getRequestFormatType()),
@@ -201,23 +197,28 @@ public ClientRequestExecutor create(SocketDestination dest) throws Exception {
selectorManager.registrationQueue.add(clientRequestExecutor);
selector.wakeup();
- // Block while we wait for the protocol negotiation to complete.
+ // Block while we wait for protocol negotiation to complete. May
+ // throw interrupted exception
clientRequest.await();
- try {
- // This will throw an error if the result of the protocol
- // negotiation failed, otherwise it returns an uninteresting token
- // we can safely ignore.
+ // Either returns uninteresting token, or throws exception if
+ // protocol negotiation failed.
clientRequest.getResult();
} catch(Exception e) {
- // Don't forget to close the socket before we throw our exception or
- // they'll leak :(
+ // Make sure not to leak socketChannels
+ if(socketChannel != null) {
try {
socketChannel.close();
} catch(Exception ex) {
if(logger.isEnabledFor(Level.WARN))
logger.warn(ex, ex);
}
+ }
+ // If clientRequestExector is not null, some additional clean up may
+ // be warranted. However, clientRequestExecutor.close(), the
+ // "obvious" clean up, is not safe to call here. This is because
+ // .close() checks in a resource to the KeyedResourcePool that was
+ // never checked out.
throw e;
}
@@ -63,7 +63,7 @@ public ClientRequestExecutorPool(int selectors,
.setMaxInvalidAttempts(maxConnectionsPerNode)
.setTimeout(connectionTimeoutMs,
TimeUnit.MILLISECONDS);
- this.jmxEnabled = jmxEnabled;
+ this.jmxEnabled = jmxEnabled;
if(this.jmxEnabled) {
stats = new ClientSocketStats();
JmxUtils.registerMbean(new ClientSocketStatsJmx(stats),

0 comments on commit 44315a9

Please sign in to comment.