From b376a975bb3a0c69f999dcf75525d81ddf652ba9 Mon Sep 17 00:00:00 2001 From: Sorabh Hamirwasia Date: Wed, 11 Oct 2017 15:57:21 -0700 Subject: [PATCH 1/2] DRILL-5874: NPE in AnonWebUserConnection.cleanupSession() --- .../exec/server/rest/DrillRestServer.java | 36 ++++- .../exec/server/rest/WebSessionResources.java | 15 +- .../exec/server/rest/WebUserConnection.java | 7 + .../server/rest/WebSessionResourcesTest.java | 146 ++++++++++++++++++ 4 files changed, 194 insertions(+), 10 deletions(-) create mode 100644 exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java index 6eb47e6592c..907092ee0d7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java @@ -29,6 +29,9 @@ import freemarker.cache.WebappTemplateLoader; import freemarker.core.HTMLOutputFormat; import freemarker.template.Configuration; +import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultChannelPromise; +import io.netty.util.concurrent.EventExecutor; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.memory.BufferAllocator; @@ -108,10 +111,17 @@ public DrillRestServer(final WorkManager workManager, final ServletContext servl provider.setMapper(workManager.getContext().getLpPersistence().getMapper()); register(provider); + // Get an EventExecutor out of the BitServer EventLoopGroup to notify listeners for WebUserConnection. For + // actual connections between Drillbits this EventLoopGroup is used to handle network related events. Though + // there is no actual network connection associated with WebUserConnection but we need a CloseFuture in + // WebSessionResources, so we are using EvenExecutor from network EventLoopGroup pool. + final EventExecutor executor = workManager.getContext().getBitLoopGroup().next(); + register(new AbstractBinder() { @Override protected void configure() { bind(workManager).to(WorkManager.class); + bind(executor).to(EventExecutor.class); bind(workManager.getContext().getLpPersistence().getMapper()).to(ObjectMapper.class); bind(workManager.getContext().getStoreProvider()).to(PersistentStoreProvider.class); bind(workManager.getContext().getStorage()).to(StoragePluginRegistry.class); @@ -159,6 +169,9 @@ public static class AuthWebUserConnectionProvider implements Factory listener, QueryWritableBatch result } } + /** + * Returns a DefaultChannelPromise which doesn't have reference to any actual channel but has an EventExecutor + * associated with it. In this case we use EventExecutor out of BitServer EventLoopGroup. Since there is no actual + * connection established using this class, hence the close event will never be fired by underlying layer and close + * future is set only when the WebSessionResources are closed. + * @return + */ @Override public ChannelFuture getChannelClosureFuture() { return webSessionResources.getCloseFuture(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java new file mode 100644 index 00000000000..e7496725be4 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.server.rest; + +import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultChannelPromise; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.rpc.TransportCheck; +import org.apache.drill.exec.rpc.user.UserSession; +import org.junit.Test; + +import java.net.SocketAddress; +import java.util.concurrent.CountDownLatch; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class WebSessionResourcesTest { + //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WebSessionResourcesTest.class); + + private WebSessionResources webSessionResources; + + private boolean listenerComplete; + + private CountDownLatch latch; + + private EventExecutor executor; + + + private class TestClosedListener implements GenericFutureListener> { + @Override + public void operationComplete(Future future) throws Exception { + listenerComplete = true; + latch.countDown(); + } + } + + @Test + public void testChannelPromiseWithNullExecutor() throws Exception { + try { + ChannelPromise closeFuture = new DefaultChannelPromise(null); + webSessionResources = new WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class), mock + (UserSession.class), closeFuture); + webSessionResources.close(); + fail(); + } catch (Exception e) { + assertTrue(e instanceof NullPointerException); + verify(webSessionResources.getAllocator()).close(); + verify(webSessionResources.getSession()).close(); + } + } + + @Test + public void testChannelPromiseWithValidExecutor() throws Exception { + try { + EventExecutor mockExecutor = mock(EventExecutor.class); + ChannelPromise closeFuture = new DefaultChannelPromise(null, mockExecutor); + webSessionResources = new WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class), mock + (UserSession.class), closeFuture); + webSessionResources.close(); + verify(webSessionResources.getAllocator()).close(); + verify(webSessionResources.getSession()).close(); + verify(mockExecutor).inEventLoop(); + verify(mockExecutor).execute(any(Runnable.class)); + assertTrue(webSessionResources.getCloseFuture() == null); + assertTrue(!listenerComplete); + } catch (Exception e) { + fail(); + } + } + + @Test + public void testDoubleClose() throws Exception { + try { + ChannelPromise closeFuture = new DefaultChannelPromise(null, mock(EventExecutor.class)); + webSessionResources = new WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class), mock + (UserSession.class), closeFuture); + webSessionResources.close(); + + verify(webSessionResources.getAllocator()).close(); + verify(webSessionResources.getSession()).close(); + assertTrue(webSessionResources.getCloseFuture() == null); + + webSessionResources.close(); + } catch (Exception e) { + fail(); + } + } + + @Test + public void testCloseWithListener() throws Exception { + try { + // Assign latch, executor and closeListener for this test case + GenericFutureListener> closeListener = new TestClosedListener(); + latch = new CountDownLatch(1); + executor = TransportCheck.createEventLoopGroup(1, "Test-Thread").next(); + ChannelPromise closeFuture = new DefaultChannelPromise(null, executor); + + // create WebSessionResources with above ChannelPromise to notify listener + webSessionResources = new WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class), + mock(UserSession.class), closeFuture); + + // Add the Test Listener to close future + assertTrue(!listenerComplete); + closeFuture.addListener(closeListener); + + // Close the WebSessionResources + webSessionResources.close(); + + // Verify the states + verify(webSessionResources.getAllocator()).close(); + verify(webSessionResources.getSession()).close(); + assertTrue(webSessionResources.getCloseFuture() == null); + + // Since listener will be invoked so test should not wait forever + latch.await(); + assertTrue(listenerComplete); + } catch (Exception e) { + fail(); + } finally { + listenerComplete = false; + executor.shutdownGracefully(); + } + } +} \ No newline at end of file From ecb94f6b0c56aaeea014aa16a6929d72d03c693b Mon Sep 17 00:00:00 2001 From: Sorabh Hamirwasia Date: Sat, 21 Oct 2017 11:28:39 -0700 Subject: [PATCH 2/2] DRILL-5874: Updated based on review comments --- .../exec/server/rest/DrillRestServer.java | 11 +++++---- .../exec/server/rest/WebSessionResources.java | 7 +++--- .../exec/server/rest/WebUserConnection.java | 18 +++++++------- .../server/rest/WebSessionResourcesTest.java | 24 ++++++++++++++++++- 4 files changed, 41 insertions(+), 19 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java index 907092ee0d7..15458478df9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java @@ -219,8 +219,8 @@ public WebUserConnection provide() { // Create a dummy close future which is needed by Foreman only. Foreman uses this future to add a close // listener to known about channel close event from underlying layer. We use this future to notify Foreman - // listeners when the Web connection between Web Client and WebServer is closed. This will help Foreman to cancel - // all the running queries for this Web Client. + // listeners when the Web session (not connection) between Web Client and WebServer is closed. This will help + // Foreman to cancel all the running queries for this Web Client. final ChannelPromise closeFuture = new DefaultChannelPromise(null, executor); // Create a WebSessionResource instance which owns the lifecycle of all the session resources. @@ -283,9 +283,10 @@ public WebUserConnection provide() { } // Create a dummy close future which is needed by Foreman only. Foreman uses this future to add a close - // listener to known about channel close event from underlying layer. We use this future to notify Foreman - // listeners when the Web connection between Web Client and WebServer is closed. This will help Foreman to cancel - // all the running queries if at all for this Web Client. + // listener to known about channel close event from underlying layer. + // + // The invocation of this close future is no-op as it will be triggered after query completion in unsecure case. + // But we need this close future as it's expected by Foreman. final ChannelPromise closeFuture = new DefaultChannelPromise(null, executor); final WebSessionResources webSessionResources = new WebSessionResources(sessionAllocator, remoteAddress, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java index 1328fe5949d..2ca457c02e1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java @@ -76,9 +76,10 @@ public void close() { // Notify all the listeners of this closeFuture for failure events so that listeners can do cleanup related to this // WebSession. This will be called after every query execution by AnonymousWebUserConnection::cleanupSession and - // for authenticated user it is called Session is destroyed. - // In case when all queries are successfully completed it's a no-op operation whereas in cases - // when there are queries still running for this session, it will help listener (Foreman) to cancel the queries. + // for authenticated user it is called when session is invalidated. + // For authenticated user it will cancel the in-flight queries based on session invalidation. Whereas for + // unauthenticated user it's a no-op since there is no session associated with it. We don't have mechanism currently + // to call this close future upon Http connection close. if (closeFuture != null) { closeFuture.setFailure(new ChannelClosedException("Http connection is closed by Web Client")); closeFuture = null; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java index af72df94833..f46b5e5e69b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java @@ -42,9 +42,14 @@ import java.util.Set; /** - * WebUserConnectionWrapper which represents the UserClientConnection for the WebUser submitting the query. It provides - * access to the UserSession executing the query. There is no actual physical channel corresponding to this connection - * wrapper. + * WebUserConnectionWrapper which represents the UserClientConnection between WebServer and Foreman, for the WebUser + * submitting the query. It provides access to the UserSession executing the query. There is no actual physical + * channel corresponding to this connection wrapper. + * + * It returns a close future with no actual underlying {@link io.netty.channel.Channel} associated with it but do have an + * EventExecutor out of BitServer EventLoopGroup. Since there is no actual connection established using this class, + * hence the close event will never be fired by underlying layer and close future is set only when the + * {@link WebSessionResources} are closed. */ public class WebUserConnection extends AbstractDisposableUserClientConnection implements ConnectionThrottle { @@ -124,13 +129,6 @@ public void sendData(RpcOutcomeListener listener, QueryWritableBatch result } } - /** - * Returns a DefaultChannelPromise which doesn't have reference to any actual channel but has an EventExecutor - * associated with it. In this case we use EventExecutor out of BitServer EventLoopGroup. Since there is no actual - * connection established using this class, hence the close event will never be fired by underlying layer and close - * future is set only when the WebSessionResources are closed. - * @return - */ @Override public ChannelFuture getChannelClosureFuture() { return webSessionResources.getCloseFuture(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java index e7496725be4..bb990de6ef2 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java @@ -36,6 +36,10 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +/** + * Validates {@link WebSessionResources} close works as expected w.r.t {@link io.netty.channel.AbstractChannel.CloseFuture} + * associated with it. + */ public class WebSessionResourcesTest { //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WebSessionResourcesTest.class); @@ -47,7 +51,7 @@ public class WebSessionResourcesTest { private EventExecutor executor; - + // A close listener added in close future in one of the test to see if it's invoked correctly. private class TestClosedListener implements GenericFutureListener> { @Override public void operationComplete(Future future) throws Exception { @@ -56,6 +60,11 @@ public void operationComplete(Future future) throws Exception { } } + /** + * Validates {@link WebSessionResources#close()} throws NPE when closefuture passed to WebSessionResources doesn't + * have a valid channel and EventExecutor associated with it. + * @throws Exception + */ @Test public void testChannelPromiseWithNullExecutor() throws Exception { try { @@ -71,6 +80,10 @@ public void testChannelPromiseWithNullExecutor() throws Exception { } } + /** + * Validates successful {@link WebSessionResources#close()} with valid CloseFuture and other parameters. + * @throws Exception + */ @Test public void testChannelPromiseWithValidExecutor() throws Exception { try { @@ -90,6 +103,10 @@ public void testChannelPromiseWithValidExecutor() throws Exception { } } + /** + * Validates double call to {@link WebSessionResources#close()} doesn't throw any exception. + * @throws Exception + */ @Test public void testDoubleClose() throws Exception { try { @@ -108,6 +125,11 @@ public void testDoubleClose() throws Exception { } } + /** + * Validates successful {@link WebSessionResources#close()} with valid CloseFuture and {@link TestClosedListener} + * getting invoked which is added to the close future. + * @throws Exception + */ @Test public void testCloseWithListener() throws Exception { try {