From 26dcf43647117c1729cc7a7f010de2774785c183 Mon Sep 17 00:00:00 2001 From: Dave Barnes Date: Thu, 10 Aug 2017 17:11:50 -0700 Subject: [PATCH] GEODE-3416: Reduce synchronization blockages in SocketCloser. Remove synchronization blocks around HashMap. Replace that implementation with simpler ThreadPool that is not unbounded and does not grow as the number of remoteAddress (clients/peers) are added --- .../cache/tier/sockets/CacheClientProxy.java | 47 +---- .../geode/internal/net/SocketCloser.java | 165 ++++++++++-------- .../internal/net/SocketCloserJUnitTest.java | 155 +++++----------- 3 files changed, 141 insertions(+), 226 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java index d7e354887b00..34f232db78ff 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java @@ -181,11 +181,7 @@ public class CacheClientProxy implements ClientSession { * True if we are connected to a client. */ private volatile boolean connected = false; - // /** - // * A string representing interest in all keys - // */ - // protected static final String ALL_KEYS = "ALL_KEYS"; - // + /** * True if a marker message is still in the ha queue. */ @@ -459,47 +455,6 @@ public ClientProxyMembershipID getProxyID() { return this.proxyID; } - // the following code was commented out simply because it was not used - // /** - // * Determines if the proxy represents the client host (and only the host, not - // * necessarily the exact VM running on the host) - // * - // * @return Whether the proxy represents the client host - // */ - // protected boolean representsClientHost(String clientHost) - // { - // // [bruce] TODO BUGBUGBUG: this should compare InetAddresses, not Strings - // return this._remoteHostAddress.equals(clientHost); - // } - - // protected boolean representsClientVM(DistributedMember remoteMember) - // { - // // logger.warn("Is input port " + clientPort + " contained in " + - // // logger.warn("Does input host " + clientHost + " equal " + - // // this._remoteHostAddress+ ": " + representsClientHost(clientHost)); - // // logger.warn("representsClientVM: " + - // // (representsClientHost(clientHost) && containsPort(clientPort))); - // return (proxyID.getDistributedMember().equals(remoteMember)); - // } - - // /** - // * Determines if the CacheClientUpdater proxied by this instance is listening - // * on the input clientHost and clientPort - // * - // * @param clientHost - // * The host name of the client to compare - // * @param clientPort - // * The port number of the client to compare - // * - // * @return Whether the CacheClientUpdater proxied by this instance is - // * listening on the input clientHost and clientPort - // */ - // protected boolean representsCacheClientUpdater(String clientHost, - // int clientPort) - // { - // return (clientPort == this._socket.getPort() && representsClientHost(clientHost)); - // } - protected boolean isMember(ClientProxyMembershipID memberId) { return this.proxyID.equals(memberId); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java index 6d86fd896566..0a9a90356989 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java @@ -14,11 +14,17 @@ */ package org.apache.geode.internal.net; +import org.apache.geode.SystemFailure; +import org.apache.geode.internal.logging.LogService; +import org.apache.geode.internal.logging.LoggingThreadGroup; +import org.apache.logging.log4j.Logger; + import java.io.IOException; import java.net.Socket; -import java.util.HashMap; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; @@ -26,12 +32,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.logging.log4j.Logger; - -import org.apache.geode.SystemFailure; -import org.apache.geode.internal.logging.LogService; -import org.apache.geode.internal.logging.LoggingThreadGroup; - /** * This class allows sockets to be closed without blocking. In some cases we have seen a call of * socket.close block for minutes. This class maintains a thread pool for every other member we have @@ -57,7 +57,7 @@ public class SocketCloser { * will queue up waiting for a thread. */ static final int ASYNC_CLOSE_POOL_MAX_THREADS = - Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS", 8).intValue(); + Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS", 4).intValue(); /** * How many milliseconds the synchronous requester waits for the async close to happen. Default is * 0. Prior releases waited 50ms. @@ -66,13 +66,16 @@ public class SocketCloser { Long.getLong("p2p.ASYNC_CLOSE_WAIT_MILLISECONDS", 0).longValue(); - /** map of thread pools of async close threads */ - private final HashMap asyncCloseExecutors = new HashMap<>(); + /** + * map of thread pools of async close threads + */ + private final ConcurrentHashMap asyncCloseExecutors = + new ConcurrentHashMap<>(); private final long asyncClosePoolKeepAliveSeconds; private final int asyncClosePoolMaxThreads; private final long asyncCloseWaitTime; private final TimeUnit asyncCloseWaitUnits; - private boolean closed; + private Boolean closed = Boolean.FALSE; public SocketCloser() { this(ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS, ASYNC_CLOSE_POOL_MAX_THREADS, @@ -96,26 +99,47 @@ public int getMaxThreads() { return this.asyncClosePoolMaxThreads; } - private ThreadPoolExecutor getAsyncThreadExecutor(String address) { - synchronized (asyncCloseExecutors) { - ThreadPoolExecutor pool = asyncCloseExecutors.get(address); - if (pool == null) { - final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger); - ThreadFactory tf = new ThreadFactory() { - public Thread newThread(final Runnable command) { - Thread thread = new Thread(tg, command); - thread.setDaemon(true); - return thread; - } - }; - BlockingQueue workQueue = new LinkedBlockingQueue(); - pool = new ThreadPoolExecutor(this.asyncClosePoolMaxThreads, this.asyncClosePoolMaxThreads, - this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, workQueue, tf); - pool.allowCoreThreadTimeOut(true); - asyncCloseExecutors.put(address, pool); + private ExecutorService getAsyncThreadExecutor(String address) { + ExecutorService executorService = asyncCloseExecutors.get(address); + if (executorService == null) { + // To be used for pre-1.8 jdk releases. + // executorService = createThreadPoolExecutor(); + + executorService = getWorkStealingPool(asyncClosePoolMaxThreads); + + ExecutorService previousThreadPoolExecutor = + asyncCloseExecutors.putIfAbsent(address, executorService); + + if (previousThreadPoolExecutor != null) { + executorService.shutdownNow(); + return previousThreadPoolExecutor; } - return pool; } + return executorService; + } + + private ExecutorService getWorkStealingPool(int maxParallelThreads) { + return Executors.newWorkStealingPool(maxParallelThreads); + } + + /** + * @deprecated since GEODE 1.3.0. Use @link{getWorkStealingPool} + */ + @Deprecated + private ExecutorService createThreadPoolExecutor() { + final ThreadGroup threadGroup = + LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger); + ThreadFactory threadFactory = new ThreadFactory() { + public Thread newThread(final Runnable command) { + Thread thread = new Thread(threadGroup, command); + thread.setDaemon(true); + return thread; + } + }; + + return new ThreadPoolExecutor(asyncClosePoolMaxThreads, asyncClosePoolMaxThreads, + asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), + threadFactory); } /** @@ -123,19 +147,11 @@ public Thread newThread(final Runnable command) { * longer needed. Currently a thread pool is kept for each address and if you know that an address * no longer needs its pool then you should call this method. */ - public void releaseResourcesForAddress(String address) { - synchronized (asyncCloseExecutors) { - ThreadPoolExecutor pool = asyncCloseExecutors.get(address); - if (pool != null) { - pool.shutdown(); - asyncCloseExecutors.remove(address); - } - } - } - private boolean isClosed() { - synchronized (asyncCloseExecutors) { - return this.closed; + public void releaseResourcesForAddress(String address) { + ExecutorService executorService = asyncCloseExecutors.remove(address); + if (executorService != null) { + executorService.shutdown(); } } @@ -144,35 +160,22 @@ private boolean isClosed() { * called then the asyncClose will be done synchronously. */ public void close() { - synchronized (asyncCloseExecutors) { + synchronized (closed) { if (!this.closed) { this.closed = true; - for (ThreadPoolExecutor pool : asyncCloseExecutors.values()) { - pool.shutdown(); - } - asyncCloseExecutors.clear(); + } else { + return; } } + for (ExecutorService executorService : asyncCloseExecutors.values()) { + executorService.shutdown(); + } + asyncCloseExecutors.clear(); } - private void asyncExecute(String address, Runnable r) { - // Waiting 50ms for the async close request to complete is what the old (close per thread) - // code did. But now that we will not create a thread for every close request - // it seems better to let the thread that requested the close to move on quickly. - // So the default has changed to not wait. The system property p2p.ASYNC_CLOSE_WAIT_MILLISECONDS - // can be set to how many milliseconds to wait. - if (this.asyncCloseWaitTime == 0) { - getAsyncThreadExecutor(address).execute(r); - } else { - Future future = getAsyncThreadExecutor(address).submit(r); - try { - future.get(this.asyncCloseWaitTime, this.asyncCloseWaitUnits); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - // We want this code to wait at most 50ms for the close to happen. - // It is ok to ignore these exception and let the close continue - // in the background. - } - } + private Future asyncExecute(String address, Runnable runnableToExecute) { + ExecutorService asyncThreadExecutor = getAsyncThreadExecutor(address); + return asyncThreadExecutor.submit(runnableToExecute); } /** @@ -181,29 +184,30 @@ private void asyncExecute(String address, Runnable r) { * this method may block for a certain amount of time. If it is called after the SocketCloser is * closed then a normal synchronous close is done. * - * @param sock the socket to close + * @param socket the socket to close * @param address identifies who the socket is connected to * @param extra an optional Runnable with stuff to execute in the async thread */ - public void asyncClose(final Socket sock, final String address, final Runnable extra) { - if (sock == null || sock.isClosed()) { + public void asyncClose(final Socket socket, final String address, final Runnable extra) { + if (socket == null || socket.isClosed()) { return; } boolean doItInline = false; try { - synchronized (asyncCloseExecutors) { - if (isClosed()) { + Future submittedTask = null; + synchronized (closed) { + if (closed) { // this SocketCloser has been closed so do a synchronous, inline, close doItInline = true; } else { - asyncExecute(address, new Runnable() { + submittedTask = asyncExecute(address, new Runnable() { public void run() { Thread.currentThread().setName("AsyncSocketCloser for " + address); try { if (extra != null) { extra.run(); } - inlineClose(sock); + inlineClose(socket); } finally { Thread.currentThread().setName("unused AsyncSocketCloser"); } @@ -211,6 +215,9 @@ public void run() { }); } } + if (submittedTask != null) { + waitForFutureTaskWithTimeout(submittedTask); + } } catch (OutOfMemoryError ignore) { // If we can't start a thread to close the socket just do it inline. // See bug 50573. @@ -220,16 +227,28 @@ public void run() { if (extra != null) { extra.run(); } - inlineClose(sock); + inlineClose(socket); } } + private void waitForFutureTaskWithTimeout(Future submittedTask) { + if (this.asyncCloseWaitTime != 0) { + try { + submittedTask.get(this.asyncCloseWaitTime, this.asyncCloseWaitUnits); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + // We want this code to wait at most the asyncCloseWaitTime for the close to happen. + // It is ok to ignore these exception and let the close continue + // in the background. + } + } + } /** * Closes the specified socket * * @param sock the socket to close */ + private static void inlineClose(final Socket sock) { // the next two statements are a mad attempt to fix bug // 36041 - segv in jrockit in pthread signaling code. This diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java index 942cad4d6fcd..a8b1d4870699 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java @@ -14,22 +14,21 @@ */ package org.apache.geode.internal.net; -import static org.junit.Assert.*; - -import java.net.Socket; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - +import org.apache.geode.test.junit.categories.UnitTest; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.apache.geode.internal.net.SocketCloser; -import org.apache.geode.test.dunit.Wait; -import org.apache.geode.test.dunit.WaitCriterion; -import org.apache.geode.test.junit.categories.UnitTest; +import java.net.Socket; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; /** * Tests the default SocketCloser. @@ -62,86 +61,49 @@ protected SocketCloser createSocketCloser() { */ @Test public void testAsync() { - final CountDownLatch cdl = new CountDownLatch(1); + final CountDownLatch countDownLatch = new CountDownLatch(1); final AtomicInteger waitingToClose = new AtomicInteger(0); - Runnable r = new Runnable() { - @Override - public void run() { - try { - waitingToClose.incrementAndGet(); - cdl.await(); - } catch (InterruptedException e) { - } - } - }; final int SOCKET_COUNT = 100; - final Socket[] aSockets = new Socket[SOCKET_COUNT]; - for (int i = 0; i < SOCKET_COUNT; i++) { - aSockets[i] = createClosableSocket(); - } - // Schedule a 100 sockets for async close. - // They should all be stuck on cdl. - for (int i = 0; i < SOCKET_COUNT; i++) { - this.socketCloser.asyncClose(aSockets[i], "A", r); - } - // Make sure the sockets have not been closed - for (int i = 0; i < SOCKET_COUNT; i++) { - assertEquals(false, aSockets[i].isClosed()); - } - final Socket[] bSockets = new Socket[SOCKET_COUNT]; - for (int i = 0; i < SOCKET_COUNT; i++) { - bSockets[i] = createClosableSocket(); - } + final int REMOTE_CLIENT_COUNT = 200; + + List trackedSockets = new ArrayList<>(); // Schedule a 100 sockets for async close. - // They should all be stuck on cdl. - for (int i = 0; i < SOCKET_COUNT; i++) { - this.socketCloser.asyncClose(bSockets[i], "B", r); - } - // Make sure the sockets have not been closed - for (int i = 0; i < SOCKET_COUNT; i++) { - assertEquals(false, bSockets[i].isClosed()); + // They should all be stuck on countDownLatch. + for (int i = 0; i < REMOTE_CLIENT_COUNT; i++) { + Socket[] aSockets = new Socket[SOCKET_COUNT]; + String address = i + ""; + for (int j = 0; j < SOCKET_COUNT; j++) { + aSockets[j] = createClosableSocket(); + trackedSockets.add(aSockets[j]); + this.socketCloser.asyncClose(aSockets[j], address, () -> { + try { + waitingToClose.incrementAndGet(); + countDownLatch.await(); + } catch (InterruptedException e) { + } + }); + } } + // close the socketCloser first to verify that the sockets // that have already been scheduled will be still be closed. - this.socketCloser.releaseResourcesForAddress("A"); this.socketCloser.close(); - // Each thread pool (one for A and one for B) has a max of 8 threads. - // So verify that this many are currently waiting on cdl. - { - final int maxThreads = this.socketCloser.getMaxThreads(); - WaitCriterion wc = new WaitCriterion() { - public boolean done() { - return waitingToClose.get() == 2 * maxThreads; - } - - public String description() { - return "expected " + 2 * maxThreads + " waiters but found only " + waitingToClose.get(); - } - }; - Wait.waitForCriterion(wc, 5000, 10, true); - } - // now count down the latch that allows the sockets to close - cdl.countDown(); + countDownLatch.countDown(); // now all the sockets should get closed; use a wait criteria // since a thread pool is doing to closes - { - WaitCriterion wc = new WaitCriterion() { - public boolean done() { - for (int i = 0; i < SOCKET_COUNT; i++) { - if (!aSockets[i].isClosed() || !bSockets[i].isClosed()) { - return false; - } - } - return true; + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> { + boolean areAllClosed = true; + for (Iterator iterator = trackedSockets.iterator(); iterator.hasNext();) { + Socket socket = iterator.next(); + if (socket.isClosed()) { + iterator.remove(); + continue; } - - public String description() { - return "one or more sockets did not close"; - } - }; - Wait.waitForCriterion(wc, 5000, 10, true); - } + areAllClosed = false; + } + return areAllClosed; + }); } /** @@ -150,18 +112,11 @@ public String description() { @Test public void testClosedSocket() throws Exception { final AtomicBoolean runnableCalled = new AtomicBoolean(); - Runnable r = new Runnable() { - @Override - public void run() { - runnableCalled.set(true); - } - }; Socket s = createClosableSocket(); s.close(); - this.socketCloser.asyncClose(s, "A", r); - Wait.pause(10); - assertEquals(false, runnableCalled.get()); + this.socketCloser.asyncClose(s, "A", () -> runnableCalled.set(true)); + Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> !runnableCalled.get()); } /** @@ -170,25 +125,11 @@ public void run() { @Test public void testClosedSocketCloser() { final AtomicBoolean runnableCalled = new AtomicBoolean(); - Runnable r = new Runnable() { - @Override - public void run() { - runnableCalled.set(true); - } - }; - final Socket s = createClosableSocket(); + final Socket closableSocket = createClosableSocket(); this.socketCloser.close(); - this.socketCloser.asyncClose(s, "A", r); - WaitCriterion wc = new WaitCriterion() { - public boolean done() { - return runnableCalled.get() && s.isClosed(); - } - - public String description() { - return "runnable was not called or socket was not closed"; - } - }; - Wait.waitForCriterion(wc, 5000, 10, true); + this.socketCloser.asyncClose(closableSocket, "A", () -> runnableCalled.set(true)); + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .until(() -> runnableCalled.get() && closableSocket.isClosed()); } }