From 75baf01e83e7db2cfd60850e9facf535cf10d887 Mon Sep 17 00:00:00 2001 From: Stephen Mallette Date: Tue, 13 Sep 2016 18:10:09 -0400 Subject: [PATCH] Improved session cleanup on client close. While not a perfect implementation, a long run job blocking a close request from the client will now at least get an attempt at interruption rather thant consuming the thread indefinitely. TINKERPOP-1442 --- CHANGELOG.asciidoc | 1 + .../tinkerpop/gremlin/driver/Connection.java | 2 +- .../groovy/engine/GremlinExecutor.java | 2 +- .../gremlin/server/op/session/Session.java | 12 +++++++ .../server/op/session/SessionOpProcessor.java | 9 +++++ .../server/GremlinDriverIntegrateTest.java | 2 +- .../server/GremlinServerIntegrateTest.java | 4 +-- .../GremlinServerSessionIntegrateTest.java | 33 +++++++++++++++++++ 8 files changed, 60 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 4d990ee4a62..a9dae9dd669 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -26,6 +26,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima TinkerPop 3.1.5 (Release Date: NOT OFFICIALLY RELEASED YET) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +* Improved session cleanup when a close is triggered by the client. * Removed the `appveyor.yml` file as the AppVeyor build is no longer enabled by Apache Infrastructure. * Fixed a bug in `RangeByIsCountStrategy` which didn't use the `NotStep` properly. diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java index 22e48fe3449..220ad42c7d7 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java @@ -92,7 +92,7 @@ public Connection(final URI uri, final ConnectionPool pool, final int maxInProce connectionLabel = String.format("Connection{host=%s}", pool.host); - if (cluster.isClosing()) throw new IllegalStateException("Cannot open a connection while the cluster after close() is called"); + if (cluster.isClosing()) throw new IllegalStateException("Cannot open a connection with the cluster after close() is called"); final Bootstrap b = this.cluster.getFactory().createBootstrap(); try { diff --git a/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java b/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java index da12e1e1121..785442a094f 100644 --- a/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java +++ b/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java @@ -312,7 +312,7 @@ public CompletableFuture eval(final String script, final String language if (root instanceof InterruptedException) { lifeCycle.getAfterTimeout().orElse(afterTimeout).accept(bindings); evaluationFuture.completeExceptionally(new TimeoutException( - String.format("Script evaluation exceeded the configured 'scriptEvaluationTimeout' threshold of %s ms for request [%s]: %s", scriptEvalTimeOut, script, root.getMessage()))); + String.format("Script evaluation exceeded the configured 'scriptEvaluationTimeout' threshold of %s ms or evaluation was otherwise cancelled directly for request [%s]: %s", scriptEvalTimeOut, script, root.getMessage()))); } else { lifeCycle.getAfterFailure().orElse(afterFailure).accept(bindings, root); evaluationFuture.completeExceptionally(root); diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java index 33b27520a7f..c9bc7c1c9ca 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java @@ -38,6 +38,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** @@ -52,6 +53,7 @@ public class Session { private final ScheduledExecutorService scheduledExecutorService; private final long configuredSessionTimeout; + private AtomicBoolean killing = new AtomicBoolean(false); private AtomicReference kill = new AtomicReference<>(); /** @@ -104,6 +106,10 @@ public String getSessionId() { return session; } + public boolean acceptingRequests() { + return !killing.get(); + } + public void touch() { // if the task of killing is cancelled successfully then reset the session monitor. otherwise this session // has already been killed and there's nothing left to do with this session. @@ -134,6 +140,8 @@ public void manualKill() { * Kills the session and rollback any uncommitted changes on transactional graphs. */ public synchronized void kill() { + killing.set(true); + // if the session has already been removed then there's no need to do this process again. it's possible that // the manuallKill and the kill future could have both called kill at roughly the same time. this prevents // kill() from being called more than once @@ -157,6 +165,10 @@ public synchronized void kill() { } } }); + + // prevent any additional requests from processing now that the mass rollback has been completed + executor.shutdownNow(); + sessions.remove(session); logger.info("Session {} closed", session); } diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java index 3497169c80c..bec0c558288 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java @@ -147,6 +147,15 @@ protected void evalOp(final Context context) throws OpProcessorException { final RequestMessage msg = context.getRequestMessage(); final Session session = getSession(context, msg); + // check if the session is still accepting requests - if not block further requests + if (!session.acceptingRequests()) { + final String sessionClosedMessage = String.format("Session %s is no longer accepting requests as it has been closed", + session.getSessionId()); + final ResponseMessage response = ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR) + .statusMessage(sessionClosedMessage).create(); + throw new OpProcessorException(sessionClosedMessage, response); + } + // place the session on the channel context so that it can be used during serialization. in this way // the serialization can occur on the same thread used to execute the gremlin within the session. this // is important given the threadlocal nature of Graph implementation transactions. diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java index 73142434f68..1a04b6b46c5 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java @@ -1258,7 +1258,7 @@ private void assertFutureTimeout(final CompletableFuture> futureFir { final Throwable root = ExceptionUtils.getRootCause(ex); assertThat(root, instanceOf(ResponseException.class)); - assertThat(root.getMessage(), startsWith("Script evaluation exceeded the configured 'scriptEvaluationTimeout' threshold of 250 ms for request")); + assertThat(root.getMessage(), startsWith("Script evaluation exceeded the configured 'scriptEvaluationTimeout' threshold of 250 ms")); } } } diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java index 2f091d98143..0f0cdae3997 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java @@ -543,7 +543,7 @@ public void shouldNotThrowNoSuchElementException() throws Exception { public void shouldReceiveFailureTimeOutOnScriptEval() throws Exception { try (SimpleClient client = new WebSocketClient()){ final List responses = client.submit("Thread.sleep(3000);'some-stuff-that-should not return'"); - assertThat(responses.get(0).getStatus().getMessage(), startsWith("Script evaluation exceeded the configured 'scriptEvaluationTimeout' threshold of 200 ms for request")); + assertThat(responses.get(0).getStatus().getMessage(), startsWith("Script evaluation exceeded the configured 'scriptEvaluationTimeout' threshold of 200 ms")); // validate that we can still send messages to the server assertEquals(2, ((List) client.submit("1+1").get(0).getResult().getData()).get(0).intValue()); @@ -559,7 +559,7 @@ public void shouldReceiveFailureTimeOutOnScriptEvalUsingOverride() throws Except .addArg(Tokens.ARGS_GREMLIN, "Thread.sleep(3000);'some-stuff-that-should not return'") .create(); final List responses = client.submit(msg); - assertThat(responses.get(0).getStatus().getMessage(), startsWith("Script evaluation exceeded the configured 'scriptEvaluationTimeout' threshold of 100 ms for request")); + assertThat(responses.get(0).getStatus().getMessage(), startsWith("Script evaluation exceeded the configured 'scriptEvaluationTimeout' threshold of 100 ms")); // validate that we can still send messages to the server assertEquals(2, ((List) client.submit("1+1").get(0).getResult().getData()).get(0).intValue()); diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java index 8b34038e9a7..99b3a1b9f6c 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java @@ -50,6 +50,7 @@ import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.core.StringStartsWith.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -108,6 +109,38 @@ public Settings overrideSettings(final Settings settings) { return settings; } + @Test + public void shouldBlockAdditionalRequestsDuringClose() throws Exception { + // this is sorta cobbled together a bit given limits/rules about how you can use Cluster/Client instances. + // basically, we need one to submit the long run job and one to do the close operation that will cancel the + // long run job. it is probably possible to do this with some low-level message manipulation but that's + // probably not necessary + final Cluster cluster1 = Cluster.build().create(); + final Client client1 = cluster1.connect(name.getMethodName()); + client1.submit("1+1").all().join(); + final Cluster cluster2 = Cluster.build().create(); + final Client client2 = cluster2.connect(name.getMethodName()); + client2.submit("1+1").all().join(); + + final ResultSet rs = client1.submit("Thread.sleep(10000);1+1"); + + client2.close(); + + try { + rs.all().join(); + fail("The close of the session on client2 should have interrupted the script sent on client1"); + } catch (Exception ex) { + final Throwable root = ExceptionUtils.getRootCause(ex); + assertThat(root.getMessage(), startsWith("Script evaluation exceeded the configured 'scriptEvaluationTimeout' threshold of 30000 ms or evaluation was otherwise cancelled directly for request")); + } + + client1.close(); + + cluster1.close(); + cluster2.close(); + } + + @Test public void shouldRollbackOnEvalExceptionForManagedTransaction() throws Exception { assumeNeo4jIsPresent();