Skip to content

Commit

Permalink
TINKERPOP-2169 Fix connection leak on CorruptedFrameException
Browse files Browse the repository at this point in the history
Connection leak is fixed by forcefully destroying connection when the underlying
Netty channel is inactive. This commit also removes marking the host as
unavailable when a connection is abruptly disconnected.
  • Loading branch information
Divij Vaidya committed Mar 8, 2019
1 parent 5ac3add commit 2cd84ff
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 56 deletions.
Expand Up @@ -365,7 +365,7 @@ protected Map<String,String> makeDefaultAliasMap(final String graphOrTraversalSo
*/
public final static class ClusteredClient extends Client {

private ConcurrentMap<Host, ConnectionPool> hostConnectionPools = new ConcurrentHashMap<>();
protected ConcurrentMap<Host, ConnectionPool> hostConnectionPools = new ConcurrentHashMap<>();
private final AtomicReference<CompletableFuture<Void>> closing = new AtomicReference<>(null);

ClusteredClient(final Cluster cluster, final Client.Settings settings) {
Expand Down
Expand Up @@ -18,20 +18,14 @@
*/
package org.apache.tinkerpop.gremlin.driver;

import io.netty.handler.codec.CodecException;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.xml.ws.Response;
import java.io.IOException;
import java.net.URI;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -80,7 +74,6 @@ final class Connection {
public final AtomicInteger borrowed = new AtomicInteger(0);
private final AtomicReference<Class<Channelizer>> channelizerClass = new AtomicReference<>(null);

private volatile boolean isDead = false;
private final int maxInProcess;

private final String connectionLabel;
Expand Down Expand Up @@ -133,8 +126,14 @@ public int availableInProcess() {
return Math.max(0, maxInProcess - pending.size());
}

/**
* Consider a connection as dead if the underlying channel is not connected.
*
* Note: A dead connection does not necessarily imply that the server is unavailable. Additional checks
* should be performed to mark the server host as unavailable.
*/
public boolean isDead() {
return isDead;
return (channel !=null && !channel.isActive());
}

boolean isClosing() {
Expand Down Expand Up @@ -205,11 +204,7 @@ public ChannelPromise write(final RequestMessage requestMessage, final Completab
if (logger.isDebugEnabled())
logger.debug(String.format("Write on connection %s failed", thisConnection.getConnectionInfo()), f.cause());

// if there is a ResponseException the write failed because of something to do with the server
// or client side serialization - neither of these things mean that the host is dead. the
// connection should be reusable.
thisConnection.isDead = ExceptionUtils.indexOfThrowable(f.cause(), ResponseException.class) == -1;
thisConnection.returnToPool();
handleConnectionCleanupOnError(thisConnection, f.cause());

cluster.executor().submit(() -> future.completeExceptionally(f.cause()));
} else {
Expand All @@ -225,24 +220,11 @@ public ChannelPromise write(final RequestMessage requestMessage, final Completab

// the callback for when the read failed. a failed read means the request went to the server
// and came back with a server-side error of some sort. it means the server is responsive
// so this isn't going to be like a dead host situation which is handled above on a failed
// so this isn't going to be like a potentially dead host situation which is handled above on a failed
// write operation.
//
// in the event of an IOException (typically means that the Connection might have
// been closed from the server side - this is typical in situations like when a request is
// sent that exceeds maxContentLength and the server closes the channel on its side) or other
// exceptions that indicate a non-recoverable state for the Connection object
// (a netty CorruptedFrameException is a good example of that), the Connection cannot simply
// be returned to the pool as future uses will end with refusal from the server and make it
// appear as a dead host as the write will not succeed. instead, the Connection needs to be
// replaced in these scenarios which destroys the dead channel on the client and allows a new
// one to be reconstructed.
readCompleted.exceptionally(t -> {
if (t instanceof IOException || t instanceof CodecException) {
if (pool != null) pool.replaceConnection(thisConnection);
} else {
thisConnection.returnToPool();
}

handleConnectionCleanupOnError(thisConnection, t);

// close was signaled in closeAsync() but there were pending messages at that time. attempt
// the shutdown if the returned result cleared up the last pending message
Expand Down Expand Up @@ -291,6 +273,23 @@ public void returnToPool() {
}
}

/*
* In the event of an IOException (typically means that the Connection might have been closed from the server side
* - this is typical in situations like when a request is sent that exceeds maxContentLength and the server closes
* the channel on its side) or other exceptions that indicate a non-recoverable state for the Connection object
* (a netty CorruptedFrameException is a good example of that), the Connection cannot simply be returned to the
* pool as future uses will end with refusal from the server and make it appear as a dead host as the write will
* not succeed. Instead, the Connection needs to be replaced in these scenarios which destroys the dead channel
* on the client and allows a new one to be reconstructed.
*/
private void handleConnectionCleanupOnError(final Connection thisConnection, final Throwable t) {
if (thisConnection.isDead()) {
if (pool != null) pool.replaceConnection(thisConnection);
} else {
thisConnection.returnToPool();
}
}

private boolean isOkToClose() {
return pending.isEmpty() || (channel !=null && !channel.isOpen()) || !pool.host.isAvailable();
}
Expand All @@ -309,6 +308,8 @@ private synchronized void shutdown(final CompletableFuture<Void> future) {
// be called once. once shutdown is initiated, it shouldn't be executed a second time or else it sends more
// messages at the server and leads to ugly log messages over there.
if (shutdownInitiated.compareAndSet(false, true)) {
final String connectionInfo = this.getConnectionInfo();

// maybe this should be delegated back to the Client implementation??? kinda weird to instanceof here.....
if (client instanceof Client.SessionedClient) {
final boolean forceClose = client.getSettings().getSession().get().isForceClosed();
Expand Down Expand Up @@ -340,10 +341,14 @@ private synchronized void shutdown(final CompletableFuture<Void> future) {

final ChannelPromise promise = channel.newPromise();
promise.addListener(f -> {
if (f.cause() != null)
if (f.cause() != null) {
future.completeExceptionally(f.cause());
else
} else {
if (logger.isDebugEnabled())
logger.debug("{} destroyed successfully.", connectionInfo);

future.complete(null);
}
});

channel.close(promise);
Expand All @@ -352,7 +357,7 @@ private synchronized void shutdown(final CompletableFuture<Void> future) {

public String getConnectionInfo() {
return String.format("Connection{host=%s, isDead=%s, borrowed=%s, pending=%s}",
pool.host, isDead, borrowed, pending.size());
pool.host, isDead(), borrowed, pending.size());
}

@Override
Expand Down
Expand Up @@ -98,7 +98,7 @@ public ConnectionPool(final Host host, final Client client, final Optional<Integ
// ok if we don't get it initialized here - when a request is attempted in a connection from the
// pool it will try to create new connections as needed.
logger.debug("Could not initialize connections in pool for {} - pool size at {}", host, this.connections.size());
considerUnavailable();
considerHostUnavailable();
}

this.open = new AtomicInteger(connections.size());
Expand Down Expand Up @@ -171,9 +171,10 @@ public void returnConnection(final Connection connection) throws ConnectionExcep
if (isClosed()) throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");

final int borrowed = connection.borrowed.decrementAndGet();

if (connection.isDead()) {
logger.debug("Marking {} as dead", this.host);
considerUnavailable();
this.replaceConnection(connection);
} else {
if (bin.contains(connection) && borrowed == 0) {
logger.debug("{} is already in the bin and it has no inflight requests so it is safe to close", connection);
Expand Down Expand Up @@ -230,6 +231,13 @@ public synchronized CompletableFuture<Void> closeAsync() {
return future;
}

/**
* Required for testing
*/
int numConnectionsWaitingToCleanup() {
return bin.size();
}

private CompletableFuture<Void> killAvailableConnections() {
final List<CompletableFuture<Void>> futures = new ArrayList<>(connections.size());
for (Connection connection : connections) {
Expand Down Expand Up @@ -293,7 +301,7 @@ private boolean addConnectionIfUnderMaximum() {
} catch (ConnectionException ce) {
logger.debug("Connections were under max, but there was an error creating the connection.", ce);
open.decrementAndGet();
considerUnavailable();
considerHostUnavailable();
return false;
}

Expand All @@ -317,16 +325,18 @@ private boolean destroyConnection(final Connection connection) {

private void definitelyDestroyConnection(final Connection connection) {
// only add to the bin for future removal if its not already there.
if (!bin.contains(connection)) {
if (!bin.contains(connection) && !connection.isClosing()) {
bin.add(connection);
connections.remove(connection);
open.decrementAndGet();
}

// only close the connection for good once it is done being borrowed
if (connection.borrowed.get() == 0 && bin.remove(connection)) {
connection.closeAsync();
logger.debug("{} destroyed", connection.getConnectionInfo());
// only close the connection for good once it is done being borrowed or when it is dead
if (connection.isDead() || connection.borrowed.get() == 0) {
if(bin.remove(connection)) {
connection.closeAsync();
logger.debug("{} destroyed", connection.getConnectionInfo());
}
}
}

Expand Down Expand Up @@ -372,14 +382,13 @@ private Connection waitForConnection(final long timeout, final TimeUnit unit) th

// if we timeout borrowing a connection that might mean the host is dead (or the timeout was super short).
// either way supply a function to reconnect
this.considerUnavailable();
this.considerHostUnavailable();

throw new TimeoutException();
throw new TimeoutException("Timed-out waiting for connection on " + host + " - possibly unavailable");
}

private void considerUnavailable() {
// called when a connection is "dead" such that a "dead" connection means the host itself is basically
// "dead". that's probably ok for now, but this decision should likely be more flexible.
public void considerHostUnavailable() {
// called when a connection is "dead" due to a non-recoverable error.
host.makeUnavailable(this::tryReconnect);

// if the host is unavailable then we should release the connections
Expand Down
Expand Up @@ -71,6 +71,8 @@ void makeAvailable() {
void makeUnavailable(final Function<Host, Boolean> reconnect) {
isAvailable = false;

logger.warn("Marking {} as unavailable. Trying to reconnect.", this);

// only do a connection re-attempt if one is not already in progress
if (retryInProgress.compareAndSet(Boolean.FALSE, Boolean.TRUE)) {
retryThread = this.cluster.executor().scheduleAtFixedRate(() -> {
Expand Down
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.driver;

import io.netty.handler.codec.CorruptedFrameException;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.tinkerpop.gremlin.server.AbstractGremlinServerIntegrationTest;
import org.apache.tinkerpop.gremlin.server.TestClientFactory;
import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.fail;

public class ClientConnectionIntegrateTest extends AbstractGremlinServerIntegrationTest {
private Log4jRecordingAppender recordingAppender = null;
private Level previousLogLevel;

@Before
public void setupForEachTest() {
recordingAppender = new Log4jRecordingAppender();
final Logger rootLogger = Logger.getRootLogger();

if (name.getMethodName().equals("shouldCloseConnectionDeadDueToUnRecoverableError")) {
final org.apache.log4j.Logger connectionLogger = org.apache.log4j.Logger.getLogger(Connection.class);
previousLogLevel = connectionLogger.getLevel();
connectionLogger.setLevel(Level.DEBUG);
}

rootLogger.addAppender(recordingAppender);
}

@After
public void teardownForEachTest() {
final Logger rootLogger = Logger.getRootLogger();

if (name.getMethodName().equals("shouldCloseConnectionDeadDueToUnRecoverableError")) {
final org.apache.log4j.Logger connectionLogger = org.apache.log4j.Logger.getLogger(Connection.class);
connectionLogger.setLevel(previousLogLevel);
}

rootLogger.removeAppender(recordingAppender);
}

/**
* Reproducer for TINKERPOP-2169
*/
@Test
public void shouldCloseConnectionDeadDueToUnRecoverableError() throws Exception {
// Set a low value of maxContentLength to intentionally trigger CorruptedFrameException
final Cluster cluster = TestClientFactory.build()
.maxContentLength(64)
.minConnectionPoolSize(1)
.maxConnectionPoolSize(2)
.create();
final Client.ClusteredClient client = cluster.connect();

try {
// Add the test data so that the g.V() response could exceed maxContentLength
client.submit("g.inject(1).repeat(__.addV()).times(10).count()").all().get();
try {
client.submit("g.V().fold()").all().get();

fail("Should throw an exception.");
} catch (Exception re) {
assertThat(re.getCause() instanceof CorruptedFrameException, is(true));
}

// Assert that the host has not been marked unavailable
Assert.assertEquals(1, cluster.availableHosts().size());

// Assert that there is no connection leak and all connections have been closed
Assert.assertEquals(0, client.hostConnectionPools.values().stream()
.findFirst().get()
.numConnectionsWaitingToCleanup());

// Assert that the connection has been destroyed. Specifically check for the string with
// isDead=true indicating the connection that was closed due to CorruptedFrameException.
assertThat(recordingAppender.logContainsAny("^(?!.*(isDead=false)).*isDead=true.*destroyed successfully.$"), is(true));
} finally {
cluster.close();
}
}
}

0 comments on commit 2cd84ff

Please sign in to comment.