Skip to content

Commit

Permalink
Fixes for connection leak and ZenStoreClient config
Browse files Browse the repository at this point in the history
- Applied fix for socketChannel leak in ClientRequestExecutorFactory.create()
- Added comments to document other code paths at risk of leaking socketDestinations
- changed ClientConfig default from ZenStoreClient to DefaultStoreClient
- updated release notes
  • Loading branch information
jayjwylie committed Oct 29, 2012
1 parent c227e34 commit 6246d4e
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 102 deletions.
2 changes: 1 addition & 1 deletion META-INF/MANIFEST.MF
Expand Up @@ -2,6 +2,6 @@ Manifest-Version: 1.0
Ant-Version: Apache Ant 1.7.1
Created-By: 20.2-b06 (Sun Microsystems Inc.)
Implementation-Title: Voldemort
Implementation-Version: 1.0.0
Implementation-Version: 1.1.0
Implementation-Vendor: LinkedIn

11 changes: 9 additions & 2 deletions release_notes.txt
@@ -1,6 +1,13 @@
Release 1.1.0.on 10/19/2012
Release 1.1.1 on 10/30/2012

Changes made since release 1.1.0
Changed made since release 1.1.0
* Fixed connection leak in ClientRequestExecutorFactory
* Changed client to default to DefaultStoreClient


Release 1.1.0 on 10/19/2012

Changes made since release 1.0.0

IMPORTANT NOTE : This release has significant changes to the BDB storage layer.
Users are required to read the bin/PREUPGRADE_FOR_1_1_X_README file
Expand Down
6 changes: 4 additions & 2 deletions src/java/voldemort/client/ClientConfig.java
Expand Up @@ -68,8 +68,10 @@ public class ClientConfig {
private volatile boolean enablePipelineRoutedStore = true;
private volatile int clientZoneId = Zone.DEFAULT_ZONE_ID;

// Flag to control which store client to use. Default = Enhanced
private volatile boolean useDefaultClient = false;
// Flag to control which store client to use:
// true = DefaultStoreClient
// false = ZenStoreClient
private volatile boolean useDefaultClient = true;

private volatile String failureDetectorImplementation = FailureDetectorConfig.DEFAULT_IMPLEMENTATION_CLASS_NAME;
private volatile long failureDetectorBannagePeriod = FailureDetectorConfig.DEFAULT_BANNAGE_PERIOD;
Expand Down
Expand Up @@ -120,104 +120,106 @@ public ClientRequestExecutor create(SocketDestination dest) throws Exception {
+ dest.getPort() + " using protocol "
+ dest.getRequestFormatType().getCode());

SocketChannel socketChannel = SocketChannel.open();
socketChannel.socket().setReceiveBufferSize(this.socketBufferSize);
socketChannel.socket().setSendBufferSize(this.socketBufferSize);
socketChannel.socket().setTcpNoDelay(true);
socketChannel.socket().setSoTimeout(soTimeoutMs);
socketChannel.socket().setKeepAlive(this.socketKeepAlive);
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress(dest.getHost(), dest.getPort()));

long startTime = System.currentTimeMillis();
long duration = 0;
long currWaitTime = 1;
long prevWaitTime = 1;

// Since we're non-blocking and it takes a non-zero amount of time
// to connect, invoke finishConnect and loop.
while(!socketChannel.finishConnect()) {
duration = System.currentTimeMillis() - startTime;
long remaining = this.connectTimeoutMs - duration;

if(remaining < 0) {
// Don't forget to close the socket before we throw our
// exception or they'll leak :(
SocketChannel socketChannel = null;
ClientRequestExecutor clientRequestExecutor = null;

try {
socketChannel = SocketChannel.open();
socketChannel.socket().setReceiveBufferSize(this.socketBufferSize);
socketChannel.socket().setSendBufferSize(this.socketBufferSize);
socketChannel.socket().setTcpNoDelay(true);
socketChannel.socket().setSoTimeout(soTimeoutMs);
socketChannel.socket().setKeepAlive(this.socketKeepAlive);
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress(dest.getHost(), dest.getPort()));

long startTime = System.currentTimeMillis();
long duration = 0;
long currWaitTime = 1;
long prevWaitTime = 1;

// Since we're non-blocking and it takes a non-zero amount of time
// to connect, invoke finishConnect and loop.
while(!socketChannel.finishConnect()) {
duration = System.currentTimeMillis() - startTime;
long remaining = this.connectTimeoutMs - duration;

if(remaining < 0) {
throw new ConnectException("Cannot connect socket " + numCreated + " for "
+ dest.getHost() + ":" + dest.getPort() + " after "
+ duration + " ms");
}

if(logger.isTraceEnabled())
logger.trace("Still creating socket " + numCreated + " for " + dest.getHost()
+ ":" + dest.getPort() + ", " + remaining
+ " ms. remaining to connect");

try {
socketChannel.close();
} catch(Exception e) {
// Break up the connection timeout into smaller units,
// employing a Fibonacci-style back-off (1, 2, 3, 5, 8, ...)
Thread.sleep(Math.min(remaining, currWaitTime));
currWaitTime = Math.min(currWaitTime + prevWaitTime, 50);
prevWaitTime = currWaitTime - prevWaitTime;
} catch(InterruptedException e) {
if(logger.isEnabledFor(Level.WARN))
logger.warn(e, e);
}

throw new ConnectException("Cannot connect socket " + numCreated + " for "
+ dest.getHost() + ":" + dest.getPort() + " after "
+ duration + " ms");
}

if(logger.isTraceEnabled())
logger.trace("Still creating socket " + numCreated + " for " + dest.getHost() + ":"
+ dest.getPort() + ", " + remaining + " ms. remaining to connect");

try {
// Break up the connection timeout into smaller units,
// employing a Fibonacci-style back-off (1, 2, 3, 5, 8, ...)
Thread.sleep(Math.min(remaining, currWaitTime));
currWaitTime = Math.min(currWaitTime + prevWaitTime, 50);
prevWaitTime = currWaitTime - prevWaitTime;
} catch(InterruptedException e) {
if(logger.isEnabledFor(Level.WARN))
logger.warn(e, e);
}
}

if(logger.isDebugEnabled())
logger.debug("Created socket " + numCreated + " for " + dest.getHost() + ":"
+ dest.getPort() + " using protocol "
+ dest.getRequestFormatType().getCode() + " after " + duration + " ms.");

// check buffer sizes--you often don't get out what you put in!
if(socketChannel.socket().getReceiveBufferSize() != this.socketBufferSize)
logger.debug("Requested socket receive buffer size was " + this.socketBufferSize
+ " bytes but actual size is "
+ socketChannel.socket().getReceiveBufferSize() + " bytes.");

if(socketChannel.socket().getSendBufferSize() != this.socketBufferSize)
logger.debug("Requested socket send buffer size was " + this.socketBufferSize
+ " bytes but actual size is "
+ socketChannel.socket().getSendBufferSize() + " bytes.");

ClientRequestSelectorManager selectorManager = selectorManagers[counter.getAndIncrement()
% selectorManagers.length];

Selector selector = selectorManager.getSelector();
ClientRequestExecutor clientRequestExecutor = new ClientRequestExecutor(selector,
socketChannel,
socketBufferSize);
BlockingClientRequest<String> clientRequest = new BlockingClientRequest<String>(new ProtocolNegotiatorClientRequest(dest.getRequestFormatType()),
this.getTimeout());
clientRequestExecutor.addClientRequest(clientRequest);

selectorManager.registrationQueue.add(clientRequestExecutor);
selector.wakeup();

// Block while we wait for the protocol negotiation to complete.
clientRequest.await();

try {
// This will throw an error if the result of the protocol
// negotiation failed, otherwise it returns an uninteresting token
// we can safely ignore.
if(logger.isDebugEnabled())
logger.debug("Created socket " + numCreated + " for " + dest.getHost() + ":"
+ dest.getPort() + " using protocol "
+ dest.getRequestFormatType().getCode() + " after " + duration
+ " ms.");

// check buffer sizes--you often don't get out what you put in!
if(socketChannel.socket().getReceiveBufferSize() != this.socketBufferSize)
logger.debug("Requested socket receive buffer size was " + this.socketBufferSize
+ " bytes but actual size is "
+ socketChannel.socket().getReceiveBufferSize() + " bytes.");

if(socketChannel.socket().getSendBufferSize() != this.socketBufferSize)
logger.debug("Requested socket send buffer size was " + this.socketBufferSize
+ " bytes but actual size is "
+ socketChannel.socket().getSendBufferSize() + " bytes.");

ClientRequestSelectorManager selectorManager = selectorManagers[counter.getAndIncrement()
% selectorManagers.length];

Selector selector = selectorManager.getSelector();
clientRequestExecutor = new ClientRequestExecutor(selector,
socketChannel,
socketBufferSize);
BlockingClientRequest<String> clientRequest = new BlockingClientRequest<String>(new ProtocolNegotiatorClientRequest(dest.getRequestFormatType()),
this.getTimeout());
clientRequestExecutor.addClientRequest(clientRequest);

selectorManager.registrationQueue.add(clientRequestExecutor);
selector.wakeup();

// Block while we wait for protocol negotiation to complete. May
// throw interrupted exception
clientRequest.await();

// Either returns uninteresting token, or throws exception if
// protocol negotiation failed.
clientRequest.getResult();
} catch(Exception e) {
// Don't forget to close the socket before we throw our exception or
// they'll leak :(
try {
socketChannel.close();
} catch(Exception ex) {
if(logger.isEnabledFor(Level.WARN))
logger.warn(ex, ex);
// Make sure not to leak socketChannels
if(socketChannel != null) {
try {
socketChannel.close();
} catch(Exception ex) {
if(logger.isEnabledFor(Level.WARN))
logger.warn(ex, ex);
}
}
// If clientRequestExector is not null, some additional clean up may
// be warranted. However, clientRequestExecutor.close(), the
// "obvious" clean up, is not safe to call here. This is because
// .close() checks in a resource to the KeyedResourcePool that was
// never checked out.

throw e;
}
Expand Down
Expand Up @@ -162,6 +162,13 @@ public ClientRequestExecutor checkout(SocketDestination destination) {
try {
clientRequestExecutor = queuedPool.checkout(destination);
} catch(Exception e) {
// If this exception caught here is from the nonBlockingPut call
// within KeyedResourcePool.attemptGrow(), then there is the chance
// a ClientRequestExector resource will be leaked. Cannot safely
// deal with this here since clientRequestExecutor is not assigned
// in this catch. Even if it was, clientRequestExecutore.close()
// checks in the SocketDestination resource and so is not safe to
// call.
throw new UnreachableStoreException("Failure while checking out socket for "
+ destination + ": ", e);
} finally {
Expand Down
2 changes: 2 additions & 0 deletions src/java/voldemort/utils/pool/KeyedResourcePool.java
Expand Up @@ -446,6 +446,8 @@ public <K> boolean attemptGrow(K key, ResourceFactory<K, V> objectFactory) throw
}
}
} catch(Exception e) {
// If nonBlockingPut throws an exception, then we could leak
// the resource created by objectFactory.create().
this.size.decrementAndGet();
throw e;
}
Expand Down

0 comments on commit 6246d4e

Please sign in to comment.