From e7abe5c69bed37472bb631aada11b88c33ee0716 Mon Sep 17 00:00:00 2001 From: Baptiste Lesquoy Date: Fri, 4 Aug 2023 11:37:33 +0700 Subject: [PATCH] [GS] Adds threaded command management + custom keepalive GS seems to have its own automatic keepalive, but there's no way of tuning it properly, hence the custom one that runs in parallel. To deactivate, set the ping_interval parameter to a negative number --- .../headless/job/ManualExperimentJob.java | 2 +- .../headless/listener/CommandExecutor.java | 46 +++++++++++++--- .../gama/headless/listener/FetchCommand.java | 2 - .../gama/headless/listener/GamaListener.java | 8 +-- .../listener/GamaWebSocketServer.java | 53 +++++++++++++++++-- .../listener/ServerExperimentController.java | 2 - .../gama/headless/listener/StepCommand.java | 2 +- .../gama/headless/runtime/Application.java | 19 +++++-- 8 files changed, 109 insertions(+), 25 deletions(-) diff --git a/msi.gama.headless/src/msi/gama/headless/job/ManualExperimentJob.java b/msi.gama.headless/src/msi/gama/headless/job/ManualExperimentJob.java index cdb3d74f54..94a674234c 100644 --- a/msi.gama.headless/src/msi/gama/headless/job/ManualExperimentJob.java +++ b/msi.gama.headless/src/msi/gama/headless/job/ManualExperimentJob.java @@ -169,7 +169,7 @@ public void initParam(final GamaJsonList p) { } /** - * Inits the end contion. + * Inits the end condition. * * @param cond * the cond diff --git a/msi.gama.headless/src/msi/gama/headless/listener/CommandExecutor.java b/msi.gama.headless/src/msi/gama/headless/listener/CommandExecutor.java index 0ae16d0a6d..27407b2f90 100644 --- a/msi.gama.headless/src/msi/gama/headless/listener/CommandExecutor.java +++ b/msi.gama.headless/src/msi/gama/headless/listener/CommandExecutor.java @@ -1,19 +1,36 @@ package msi.gama.headless.listener; +import java.util.AbstractMap; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; import java.util.Map; +import java.util.Map.Entry; +import java.util.Queue; import org.java_websocket.WebSocket; import org.java_websocket.enums.ReadyState; +import msi.gama.runtime.concurrent.GamaExecutorService; import msi.gama.util.IMap; import msi.gama.util.file.json.Jsoner; public class CommandExecutor { - private final Map COMMANDS; + protected final Map COMMANDS; + protected volatile Queue>> commandQueue; + + protected final Thread commandExecutionThread = new Thread(() -> { + while (true) { + while(!commandQueue.isEmpty()) { + var cmd = commandQueue.poll(); + process(cmd.getKey(), cmd.getValue()); + } + } + }); + + public CommandExecutor() { final Map cmds = new HashMap<>(); cmds.put("load", new LoadCommand()); @@ -31,9 +48,17 @@ public CommandExecutor() { cmds.put("fetch", new FetchCommand()); COMMANDS = Collections.unmodifiableMap(cmds); + + commandQueue = new LinkedList>>(); + commandExecutionThread.setUncaughtExceptionHandler(GamaExecutorService.EXCEPTION_HANDLER); + commandExecutionThread.start(); } - public void process(final WebSocket socket, final IMap map) { + public void pushCommand(final WebSocket socket, final IMap map) { + commandQueue.add(new AbstractMap.SimpleEntry>(socket, map)); + } + + protected void process(final WebSocket socket, final IMap map) { final String cmd_type = map.get("type").toString(); ISocketCommand command = COMMANDS.get(cmd_type); @@ -41,11 +66,18 @@ public void process(final WebSocket socket, final IMap map) { throw new IllegalArgumentException("Invalid command type: " + cmd_type); } - var res = command.execute(socket, map); - if(res!=null) { - if(socket.getReadyState().equals(ReadyState.OPEN)) - socket.send(Jsoner.serialize(res)); - } + // Executes the command in a separate thread so the executor can + // continue with the next one without waiting for it to finish + new Thread(() -> { + var res = command.execute(socket, map); + if(res!=null) { + if(socket.getReadyState().equals(ReadyState.OPEN)) { + socket.send(Jsoner.serialize(res)); + } + } + }).start(); } + + } \ No newline at end of file diff --git a/msi.gama.headless/src/msi/gama/headless/listener/FetchCommand.java b/msi.gama.headless/src/msi/gama/headless/listener/FetchCommand.java index f15f495b36..41a0e94c81 100644 --- a/msi.gama.headless/src/msi/gama/headless/listener/FetchCommand.java +++ b/msi.gama.headless/src/msi/gama/headless/listener/FetchCommand.java @@ -29,8 +29,6 @@ public class FetchCommand implements ISocketCommand { @Override public CommandResponse execute(final WebSocket socket, final IMap map) { final String exp_id = map.get("exp_id") != null ? map.get("exp_id").toString() : ""; - final String socket_id = - map.get("socket_id") != null ? map.get("socket_id").toString() : "" + socket.hashCode(); final Object filepath = map.get("file"); final Object access = map.get("access"); diff --git a/msi.gama.headless/src/msi/gama/headless/listener/GamaListener.java b/msi.gama.headless/src/msi/gama/headless/listener/GamaListener.java index c541abfebe..acc1166875 100644 --- a/msi.gama.headless/src/msi/gama/headless/listener/GamaListener.java +++ b/msi.gama.headless/src/msi/gama/headless/listener/GamaListener.java @@ -32,7 +32,7 @@ public ConcurrentHashMap> private static PrintStream errorStream; - public GamaListener(final int p, final Application a, final boolean secure, final String jksPath, final String spwd, final String kpwd) { + public GamaListener(final int p, final Application a, final boolean secure, final String jksPath, final String spwd, final String kpwd, final int ping_interval) { File currentJavaJarFile = new File( GamaListener.class.getProtectionDomain().getCodeSource().getLocation().getPath()); String currentJavaJarFilePath = currentJavaJarFile.getAbsolutePath(); @@ -41,7 +41,7 @@ public GamaListener(final int p, final Application a, final boolean secure, fina Globals.IMAGES_PATH = Globals.TEMP_PATH + "\\snapshot"; GAMA.setHeadLessMode(true, new GamaServerGUIHandler()); //todo: done here and in headless simulation loader, should be refactored - createSocketServer(p, a, secure, jksPath, spwd, kpwd); + createSocketServer(p, a, secure, jksPath, spwd, kpwd, ping_interval); } /** @@ -49,8 +49,8 @@ public GamaListener(final int p, final Application a, final boolean secure, fina * * @throws UnknownHostException the unknown host exception */ - public void createSocketServer(final int port, final Application a, final boolean ssl, final String jksPath, final String spwd, final String kpwd) { - instance = new GamaWebSocketServer(port, a, this, ssl,jksPath,spwd,kpwd); + public void createSocketServer(final int port, final Application a, final boolean ssl, final String jksPath, final String spwd, final String kpwd, final int ping_interval) { + instance = new GamaWebSocketServer(port, a, this, ssl,jksPath,spwd,kpwd, ping_interval); instance.start(); System.out.println("Gama Listener started on port: " + instance.getPort()); diff --git a/msi.gama.headless/src/msi/gama/headless/listener/GamaWebSocketServer.java b/msi.gama.headless/src/msi/gama/headless/listener/GamaWebSocketServer.java index 997aaf568c..ce033e87b3 100644 --- a/msi.gama.headless/src/msi/gama/headless/listener/GamaWebSocketServer.java +++ b/msi.gama.headless/src/msi/gama/headless/listener/GamaWebSocketServer.java @@ -21,7 +21,11 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.security.KeyStore; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; @@ -29,6 +33,7 @@ import javax.net.ssl.TrustManagerFactory; import org.java_websocket.WebSocket; +import org.java_websocket.framing.Framedata; import org.java_websocket.handshake.ClientHandshake; import org.java_websocket.server.SSLParametersWebSocketServerFactory; import org.java_websocket.server.WebSocketServer; @@ -44,7 +49,6 @@ import msi.gama.headless.runtime.Application; import msi.gama.headless.script.ExperimentationPlanFactory; import msi.gama.util.GamaMapFactory; -import msi.gama.util.IList; import msi.gama.util.IMap; import msi.gama.util.file.json.Jsoner; import ummisco.gama.dev.utils.DEBUG; @@ -82,6 +86,11 @@ public void set_listener(final GamaListener _listener) { /** The cmd helper. */ CommandExecutor cmdHelper; + // variables for the keepalive pings + public final boolean canPing; // false if pingInterval is negative + public final int pingInterval; // the time interval between two ping requests in ms + protected Map pingTimers; // map of all connected clients and their associated timers running ping requests + /** * Instantiates a new gama web socket server. * @@ -94,8 +103,14 @@ public void set_listener(final GamaListener _listener) { * @param ssl * the ssl */ - public GamaWebSocketServer(final int port, final Application a, final GamaListener l, final boolean ssl, final String jksPath, final String spwd, final String kpwd) { + public GamaWebSocketServer(final int port, final Application a, final GamaListener l, final boolean ssl, final String jksPath, final String spwd, final String kpwd, final int ping_interval) { super(new InetSocketAddress(port)); + + canPing = ping_interval >= 0; + pingInterval = ping_interval; + pingTimers = new HashMap(); + + if (a.verbose) { DEBUG.ON(); } cmdHelper = new CommandExecutor(); if (ssl) { @@ -148,7 +163,17 @@ public void onOpen(final WebSocket conn, final ClientHandshake handshake) { DEBUG.OUT(conn.getRemoteSocketAddress().getAddress().getHostAddress() + " entered the room!"); conn.send(Jsoner .serialize(new GamaServerMessage(GamaServerMessageType.ConnectionSuccessful, "" + conn.hashCode()))); - + + if (canPing) { + var timer = new Timer(); + timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + conn.sendPing(); + } + }, 0, pingInterval); + pingTimers.put(conn, timer); + } // String path = URI.create(handshake.getResourceDescriptor()).getPath(); } @@ -159,8 +184,26 @@ public void onOpen(final WebSocket conn, final ClientHandshake handshake) { */ public Application getDefaultApp() { return app; } + @Override + public void onWebsocketPing(WebSocket conn, Framedata f) { + // TODO Auto-generated method stub + super.onWebsocketPing(conn, f); + } + + @Override + public void onWebsocketPong(WebSocket conn, Framedata f) { + // TODO Auto-generated method stub + super.onWebsocketPong(conn, f); + } + @Override public void onClose(final WebSocket conn, final int code, final String reason, final boolean remote) { + + var timer = pingTimers.remove(conn); + if (timer != null) { + timer.cancel(); + } + if (_listener.getLaunched_experiments().get("" + conn.hashCode()) != null) { for (ManualExperimentJob e : _listener.getLaunched_experiments().get("" + conn.hashCode()).values()) { e.controller.directPause(); @@ -217,13 +260,13 @@ public void onMessage(final WebSocket socket, final String message) { final String socket_id = map.get("socket_id") != null ? map.get("socket_id").toString() : ("" + socket.hashCode()); if(get_listener().getExperiment(socket_id, exp_id)!=null && !get_listener().getExperiment(socket_id, exp_id).controller.isPaused() ) { get_listener().getExperiment(socket_id, exp_id).controller.getScope().getSimulation().postOneShotAction(scope1 -> { - cmdHelper.process(socket, map); + cmdHelper.pushCommand(socket, map); // System.out.println(map.get("type")); return null; }); }else { - cmdHelper.process(socket, map); + cmdHelper.pushCommand(socket, map); } } catch (Exception e1) { diff --git a/msi.gama.headless/src/msi/gama/headless/listener/ServerExperimentController.java b/msi.gama.headless/src/msi/gama/headless/listener/ServerExperimentController.java index 4a5daa06b2..39acb02084 100644 --- a/msi.gama.headless/src/msi/gama/headless/listener/ServerExperimentController.java +++ b/msi.gama.headless/src/msi/gama/headless/listener/ServerExperimentController.java @@ -15,11 +15,9 @@ import java.util.concurrent.Semaphore; import org.java_websocket.WebSocket; -import org.locationtech.jts.util.Debug; import msi.gama.common.interfaces.IGui; import msi.gama.headless.core.GamaHeadlessException; -import msi.gama.headless.core.GamaServerMessage; import msi.gama.headless.core.GamaServerMessageType; import msi.gama.headless.job.ManualExperimentJob; import msi.gama.kernel.experiment.ExperimentAgent; diff --git a/msi.gama.headless/src/msi/gama/headless/listener/StepCommand.java b/msi.gama.headless/src/msi/gama/headless/listener/StepCommand.java index 8f20f524ac..a34da018af 100644 --- a/msi.gama.headless/src/msi/gama/headless/listener/StepCommand.java +++ b/msi.gama.headless/src/msi/gama/headless/listener/StepCommand.java @@ -29,7 +29,7 @@ public CommandResponse execute(final WebSocket socket, IMap map) for (int i = 0 ; i < nb_step ; i++) { try { if (sync) { - gama_exp.controller._job.doStep(); + gama_exp.doStep(); } else { gama_exp.controller.userStep(); } diff --git a/msi.gama.headless/src/msi/gama/headless/runtime/Application.java b/msi.gama.headless/src/msi/gama/headless/runtime/Application.java index 44a9361b9f..af2b047ac0 100644 --- a/msi.gama.headless/src/msi/gama/headless/runtime/Application.java +++ b/msi.gama.headless/src/msi/gama/headless/runtime/Application.java @@ -83,6 +83,8 @@ public class Application implements IApplication { /** The Constant THREAD_PARAMETER. */ final public static String THREAD_PARAMETER = "-hpc"; + + final public static String PING_INTERVAL = "-ping_interval"; /** The Constant SOCKET_PARAMETER. */ final public static String SOCKET_PARAMETER = "-socket"; @@ -133,6 +135,9 @@ public class Application implements IApplication { /** The socket. */ public int socket = -1; + // the interval between each ping sent by the server, -1 to deactivate this behaviour + public int ping_interval = 10000; + /** The console mode. */ public boolean consoleMode = false; @@ -168,9 +173,13 @@ private static void showHelp() { + " -- start the console to write xml parameter file" + "\n\t\t" + VERBOSE_PARAMETER + " -- verbose mode" + "\n\t\t" + THREAD_PARAMETER + " [core] -- set the number of core available for experimentation" + "\n\t\t" - + SOCKET_PARAMETER + " [socketPort] -- start socket pipeline to interact with another framework" + + SOCKET_PARAMETER + " [socketPort] -- starts socket pipeline to interact with another framework" + "\n\t\t" + TUNNELING_PARAMETER + " -- start pipeline to interact with another framework" + + "\n\t\t" + PING_INTERVAL + " [pingInterval] \t\t " + + "-- when in server mode (socket parameter set), defines in milliseconds the time " + + "between each ping packet sent to clients to keep alive the connection. " + + "The default value is 10000, set to -1 to deactivate this behaviour." + "\n\t=== Infos ===" + "\n\t\t" + HELP_PARAMETER + " -- get the help of the command line" + "\n\t\t" + GAMA_VERSION + " -- get the the version of gama" + "\n\t=== Library Runner ===" + "\n\t\t" @@ -234,6 +243,10 @@ private boolean checkParameters(final List args) { mustContainOutFolder = mustContainInFile = false; this.socket = Integer.parseInt(after(args, SSOCKET_PARAMETER)); } + if (args.contains(PING_INTERVAL)) { + size = size - 2; + this.ping_interval = Integer.parseInt(after(args, PING_INTERVAL)); + } if (args.contains(THREAD_PARAMETER)) { size = size - 2; processorQueue.setNumberOfThreads(Integer.parseInt(after(args, THREAD_PARAMETER))); @@ -363,13 +376,13 @@ public Object start(final IApplicationContext context) throws Exception { buildXML(args); } else if (args.contains(SOCKET_PARAMETER)) { // GamaListener.newInstance(this.socket, this); - new GamaListener(this.socket, this, false, "", "", ""); + new GamaListener(this.socket, this, false, "", "", "", this.ping_interval); } else if (args.contains(SSOCKET_PARAMETER)) { final String jks = args.contains(SSOCKET_PARAMETER_JKSPATH) ? after(args, SSOCKET_PARAMETER_JKSPATH) : ""; final String spwd = args.contains(SSOCKET_PARAMETER_SPWD) ? after(args, SSOCKET_PARAMETER_SPWD) : ""; final String kpwd = args.contains(SSOCKET_PARAMETER_KPWD) ? after(args, SSOCKET_PARAMETER_KPWD) : ""; // System.out.println(jks+" "+spwd+" "+kpwd); - new GamaListener(this.socket, this, true, jks, spwd, kpwd); + new GamaListener(this.socket, this, true, jks, spwd, kpwd, ping_interval); } else { runSimulation(args); }