From 4b743df7888f9a206252356d4bc59dabca25cc2c Mon Sep 17 00:00:00 2001 From: newk5 Date: Sat, 27 Nov 2021 07:31:01 +0000 Subject: [PATCH] Provide way to start the client/server as daemons --- .../org/java_websocket/AbstractWebSocket.java | 30 +++++++- .../client/WebSocketClient.java | 2 + .../server/WebSocketServer.java | 18 ++++- .../util/NamedThreadFactory.java | 8 ++ .../server/DaemonThreadTest.java | 75 +++++++++++++++++++ 5 files changed, 131 insertions(+), 2 deletions(-) create mode 100644 src/test/java/org/java_websocket/server/DaemonThreadTest.java diff --git a/src/main/java/org/java_websocket/AbstractWebSocket.java b/src/main/java/org/java_websocket/AbstractWebSocket.java index c3e77a08..ee3cc13d 100644 --- a/src/main/java/org/java_websocket/AbstractWebSocket.java +++ b/src/main/java/org/java_websocket/AbstractWebSocket.java @@ -90,6 +90,13 @@ public abstract class AbstractWebSocket extends WebSocketAdapter { */ private boolean websocketRunning = false; + /** + * Attribute to start internal threads as daemon + * + * @since 1.5.6 + */ + private boolean daemon = false; + /** * Attribute to sync on */ @@ -182,7 +189,7 @@ protected void startConnectionLostTimer() { private void restartConnectionLostTimer() { cancelConnectionLostTimer(); connectionLostCheckerService = Executors - .newSingleThreadScheduledExecutor(new NamedThreadFactory("connectionLostChecker")); + .newSingleThreadScheduledExecutor(new NamedThreadFactory("connectionLostChecker", daemon)); Runnable connectionLostChecker = new Runnable() { /** @@ -308,4 +315,25 @@ public void setReuseAddr(boolean reuseAddr) { this.reuseAddr = reuseAddr; } + + /** + * Getter for daemon + * + * @return whether internal threads are spawned in daemon mode + * @since 1.5.6 + */ + public boolean isDaemon() { + return daemon; + } + + /** + * Setter for daemon + *

+ * Controls whether or not internal threads are spawned in daemon mode + * + * @since 1.5.6 + */ + public void setDaemon(boolean daemon) { + this.daemon = daemon; + } } diff --git a/src/main/java/org/java_websocket/client/WebSocketClient.java b/src/main/java/org/java_websocket/client/WebSocketClient.java index 955cd3d6..756534cd 100644 --- a/src/main/java/org/java_websocket/client/WebSocketClient.java +++ b/src/main/java/org/java_websocket/client/WebSocketClient.java @@ -373,6 +373,7 @@ public void connect() { throw new IllegalStateException("WebSocketClient objects are not reuseable"); } connectReadThread = new Thread(this); + connectReadThread.setDaemon(isDaemon()); connectReadThread.setName("WebSocketConnectReadThread-" + connectReadThread.getId()); connectReadThread.start(); } @@ -515,6 +516,7 @@ public void run() { } } writeThread = new Thread(new WebsocketWriteThread(this)); + writeThread.setDaemon(isDaemon()); writeThread.start(); byte[] rawbuffer = new byte[WebSocketImpl.RCVBUF]; diff --git a/src/main/java/org/java_websocket/server/WebSocketServer.java b/src/main/java/org/java_websocket/server/WebSocketServer.java index 9dbf004d..5d0c3066 100644 --- a/src/main/java/org/java_websocket/server/WebSocketServer.java +++ b/src/main/java/org/java_websocket/server/WebSocketServer.java @@ -245,7 +245,9 @@ public void start() { if (selectorthread != null) { throw new IllegalStateException(getClass().getName() + " can only be started once."); } - new Thread(this).start(); + Thread t = new Thread(this); + t.setDaemon(isDaemon()); + t.start(); } public void stop(int timeout) throws InterruptedException { @@ -326,6 +328,20 @@ public int getPort() { return port; } + @Override + public void setDaemon(boolean daemon) { + // pass it to the AbstractWebSocket too, to use it on the connectionLostChecker thread factory + super.setDaemon(daemon); + // we need to apply this to the decoders as well since they were created during the constructor + for (WebSocketWorker w : decoders) { + if (w.isAlive()) { + throw new IllegalStateException("Cannot call setDaemon after server is already started!"); + } else + w.setDaemon(daemon); + } + } + } + /** * Get the list of active drafts * diff --git a/src/main/java/org/java_websocket/util/NamedThreadFactory.java b/src/main/java/org/java_websocket/util/NamedThreadFactory.java index 2a424fe1..19091c01 100644 --- a/src/main/java/org/java_websocket/util/NamedThreadFactory.java +++ b/src/main/java/org/java_websocket/util/NamedThreadFactory.java @@ -34,14 +34,22 @@ public class NamedThreadFactory implements ThreadFactory { private final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory(); private final AtomicInteger threadNumber = new AtomicInteger(1); private final String threadPrefix; + private final boolean daemon; public NamedThreadFactory(String threadPrefix) { this.threadPrefix = threadPrefix; + this.daemon = false; + } + + public NamedThreadFactory(String threadPrefix, boolean daemon) { + this.threadPrefix = threadPrefix; + this.daemon = daemon; } @Override public Thread newThread(Runnable runnable) { Thread thread = defaultThreadFactory.newThread(runnable); + thread.setDaemon(daemon); thread.setName(threadPrefix + "-" + threadNumber); return thread; } diff --git a/src/test/java/org/java_websocket/server/DaemonThreadTest.java b/src/test/java/org/java_websocket/server/DaemonThreadTest.java new file mode 100644 index 00000000..f1b25c6f --- /dev/null +++ b/src/test/java/org/java_websocket/server/DaemonThreadTest.java @@ -0,0 +1,75 @@ +package org.java_websocket.server; + +import java.io.IOException; +import java.net.*; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import org.java_websocket.WebSocket; +import org.java_websocket.handshake.*; +import org.java_websocket.client.*; +import org.java_websocket.server.WebSocketServer; +import org.java_websocket.util.SocketUtil; +import org.junit.Test; +import static org.junit.Assert.assertTrue; + +public class DaemonThreadTest { + + @Test(timeout = 1000) + public void test_AllCreatedThreadsAreDaemon() throws Throwable { + + Set threadSet1 = Thread.getAllStackTraces().keySet(); + final CountDownLatch ready = new CountDownLatch(1); + + WebSocketServer server = new WebSocketServer(new InetSocketAddress(SocketUtil.getAvailablePort())) { + @Override + public void onOpen(WebSocket conn, ClientHandshake handshake) {} + @Override + public void onClose(WebSocket conn, int code, String reason, boolean remote) {} + @Override + public void onMessage(WebSocket conn, String message) {} + @Override + public void onError(WebSocket conn, Exception ex) {} + @Override + public void onStart() {} + }; + server.setDaemon(true); + server.setDaemon(false); + server.setDaemon(true); + server.start(); + + WebSocketClient client = new WebSocketClient(URI.create("ws://localhost:" + server.getPort())) { + @Override + public void onOpen(ServerHandshake handshake) { + ready.countDown(); + } + @Override + public void onClose(int code, String reason, boolean remote) {} + @Override + public void onMessage(String message) {} + @Override + public void onError(Exception ex) {} + }; + client.setDaemon(false); + client.setDaemon(true); + client.connect(); + + ready.await(); + Set threadSet2 = Thread.getAllStackTraces().keySet(); + threadSet2.removeAll(threadSet1); + + assertTrue("new threads created (no new threads indicates issue in test)", !threadSet2.isEmpty()); + + for (Thread t : threadSet2) + assertTrue(t.getName(), t.isDaemon()); + + boolean exception = false; + try { + server.setDaemon(false); + } catch(IllegalStateException e) { + exception = true; + } + assertTrue("exception was thrown when calling setDaemon on a running server", exception); + + server.stop(); + } +}