Skip to content

Commit

Permalink
[GS] Adds threaded command management + custom keepalive
Browse files Browse the repository at this point in the history
GS seems to have its own automatic keepalive, but there's no way of tuning it properly, hence the custom one that runs in parallel.
To deactivate, set the ping_interval parameter to a negative number
  • Loading branch information
lesquoyb committed Aug 4, 2023
1 parent 3a54d19 commit e7abe5c
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 25 deletions.
Expand Up @@ -169,7 +169,7 @@ public void initParam(final GamaJsonList p) {
}

/**
* Inits the end contion.
* Inits the end condition.
*
* @param cond
* the cond
Expand Down
@@ -1,19 +1,36 @@
package msi.gama.headless.listener;

import java.util.AbstractMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;

import org.java_websocket.WebSocket;
import org.java_websocket.enums.ReadyState;

import msi.gama.runtime.concurrent.GamaExecutorService;
import msi.gama.util.IMap;
import msi.gama.util.file.json.Jsoner;

public class CommandExecutor {

private final Map<String, ISocketCommand> COMMANDS;
protected final Map<String, ISocketCommand> COMMANDS;

protected volatile Queue<Entry<WebSocket, IMap<String, Object>>> commandQueue;

protected final Thread commandExecutionThread = new Thread(() -> {
while (true) {
while(!commandQueue.isEmpty()) {
var cmd = commandQueue.poll();
process(cmd.getKey(), cmd.getValue());
}
}
});


public CommandExecutor() {
final Map<String, ISocketCommand> cmds = new HashMap<>();
cmds.put("load", new LoadCommand());
Expand All @@ -31,21 +48,36 @@ public CommandExecutor() {
cmds.put("fetch", new FetchCommand());

COMMANDS = Collections.unmodifiableMap(cmds);

commandQueue = new LinkedList<Map.Entry<WebSocket,IMap<String,Object>>>();
commandExecutionThread.setUncaughtExceptionHandler(GamaExecutorService.EXCEPTION_HANDLER);
commandExecutionThread.start();
}

public void process(final WebSocket socket, final IMap<String, Object> map) {
public void pushCommand(final WebSocket socket, final IMap<String, Object> map) {
commandQueue.add(new AbstractMap.SimpleEntry<WebSocket, IMap<String, Object>>(socket, map));
}

protected void process(final WebSocket socket, final IMap<String, Object> map) {
final String cmd_type = map.get("type").toString();
ISocketCommand command = COMMANDS.get(cmd_type);

if (command == null) {
throw new IllegalArgumentException("Invalid command type: " + cmd_type);
}

var res = command.execute(socket, map);
if(res!=null) {
if(socket.getReadyState().equals(ReadyState.OPEN))
socket.send(Jsoner.serialize(res));
}
// Executes the command in a separate thread so the executor can
// continue with the next one without waiting for it to finish
new Thread(() -> {
var res = command.execute(socket, map);
if(res!=null) {
if(socket.getReadyState().equals(ReadyState.OPEN)) {
socket.send(Jsoner.serialize(res));
}
}
}).start();
}



}
Expand Up @@ -29,8 +29,6 @@ public class FetchCommand implements ISocketCommand {
@Override
public CommandResponse execute(final WebSocket socket, final IMap<String, Object> map) {
final String exp_id = map.get("exp_id") != null ? map.get("exp_id").toString() : "";
final String socket_id =
map.get("socket_id") != null ? map.get("socket_id").toString() : "" + socket.hashCode();

final Object filepath = map.get("file");
final Object access = map.get("access");
Expand Down
Expand Up @@ -32,7 +32,7 @@ public ConcurrentHashMap<String, ConcurrentHashMap<String, ManualExperimentJob>>

private static PrintStream errorStream;

public GamaListener(final int p, final Application a, final boolean secure, final String jksPath, final String spwd, final String kpwd) {
public GamaListener(final int p, final Application a, final boolean secure, final String jksPath, final String spwd, final String kpwd, final int ping_interval) {
File currentJavaJarFile = new File(
GamaListener.class.getProtectionDomain().getCodeSource().getLocation().getPath());
String currentJavaJarFilePath = currentJavaJarFile.getAbsolutePath();
Expand All @@ -41,16 +41,16 @@ public GamaListener(final int p, final Application a, final boolean secure, fina

Globals.IMAGES_PATH = Globals.TEMP_PATH + "\\snapshot";
GAMA.setHeadLessMode(true, new GamaServerGUIHandler()); //todo: done here and in headless simulation loader, should be refactored
createSocketServer(p, a, secure, jksPath, spwd, kpwd);
createSocketServer(p, a, secure, jksPath, spwd, kpwd, ping_interval);
}

/**
* Creates the socket server.
*
* @throws UnknownHostException the unknown host exception
*/
public void createSocketServer(final int port, final Application a, final boolean ssl, final String jksPath, final String spwd, final String kpwd) {
instance = new GamaWebSocketServer(port, a, this, ssl,jksPath,spwd,kpwd);
public void createSocketServer(final int port, final Application a, final boolean ssl, final String jksPath, final String spwd, final String kpwd, final int ping_interval) {
instance = new GamaWebSocketServer(port, a, this, ssl,jksPath,spwd,kpwd, ping_interval);
instance.start();
System.out.println("Gama Listener started on port: " + instance.getPort());

Expand Down
Expand Up @@ -21,14 +21,19 @@
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.KeyStore;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;

import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.TrustManagerFactory;

import org.java_websocket.WebSocket;
import org.java_websocket.framing.Framedata;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.SSLParametersWebSocketServerFactory;
import org.java_websocket.server.WebSocketServer;
Expand All @@ -44,7 +49,6 @@
import msi.gama.headless.runtime.Application;
import msi.gama.headless.script.ExperimentationPlanFactory;
import msi.gama.util.GamaMapFactory;
import msi.gama.util.IList;
import msi.gama.util.IMap;
import msi.gama.util.file.json.Jsoner;
import ummisco.gama.dev.utils.DEBUG;
Expand Down Expand Up @@ -82,6 +86,11 @@ public void set_listener(final GamaListener _listener) {
/** The cmd helper. */
CommandExecutor cmdHelper;

// variables for the keepalive pings
public final boolean canPing; // false if pingInterval is negative
public final int pingInterval; // the time interval between two ping requests in ms
protected Map<WebSocket, Timer> pingTimers; // map of all connected clients and their associated timers running ping requests

/**
* Instantiates a new gama web socket server.
*
Expand All @@ -94,8 +103,14 @@ public void set_listener(final GamaListener _listener) {
* @param ssl
* the ssl
*/
public GamaWebSocketServer(final int port, final Application a, final GamaListener l, final boolean ssl, final String jksPath, final String spwd, final String kpwd) {
public GamaWebSocketServer(final int port, final Application a, final GamaListener l, final boolean ssl, final String jksPath, final String spwd, final String kpwd, final int ping_interval) {
super(new InetSocketAddress(port));

canPing = ping_interval >= 0;
pingInterval = ping_interval;
pingTimers = new HashMap<WebSocket, Timer>();


if (a.verbose) { DEBUG.ON(); }
cmdHelper = new CommandExecutor();
if (ssl) {
Expand Down Expand Up @@ -148,7 +163,17 @@ public void onOpen(final WebSocket conn, final ClientHandshake handshake) {
DEBUG.OUT(conn.getRemoteSocketAddress().getAddress().getHostAddress() + " entered the room!");
conn.send(Jsoner
.serialize(new GamaServerMessage(GamaServerMessageType.ConnectionSuccessful, "" + conn.hashCode())));


if (canPing) {
var timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
conn.sendPing();
}
}, 0, pingInterval);
pingTimers.put(conn, timer);
}
// String path = URI.create(handshake.getResourceDescriptor()).getPath();
}

Expand All @@ -159,8 +184,26 @@ public void onOpen(final WebSocket conn, final ClientHandshake handshake) {
*/
public Application getDefaultApp() { return app; }

@Override
public void onWebsocketPing(WebSocket conn, Framedata f) {
// TODO Auto-generated method stub
super.onWebsocketPing(conn, f);
}

@Override
public void onWebsocketPong(WebSocket conn, Framedata f) {
// TODO Auto-generated method stub
super.onWebsocketPong(conn, f);
}

@Override
public void onClose(final WebSocket conn, final int code, final String reason, final boolean remote) {

var timer = pingTimers.remove(conn);
if (timer != null) {
timer.cancel();
}

if (_listener.getLaunched_experiments().get("" + conn.hashCode()) != null) {
for (ManualExperimentJob e : _listener.getLaunched_experiments().get("" + conn.hashCode()).values()) {
e.controller.directPause();
Expand Down Expand Up @@ -217,13 +260,13 @@ public void onMessage(final WebSocket socket, final String message) {
final String socket_id = map.get("socket_id") != null ? map.get("socket_id").toString() : ("" + socket.hashCode());
if(get_listener().getExperiment(socket_id, exp_id)!=null && !get_listener().getExperiment(socket_id, exp_id).controller.isPaused() ) {
get_listener().getExperiment(socket_id, exp_id).controller.getScope().getSimulation().postOneShotAction(scope1 -> {
cmdHelper.process(socket, map);
cmdHelper.pushCommand(socket, map);
// System.out.println(map.get("type"));
return null;
});
}else {

cmdHelper.process(socket, map);
cmdHelper.pushCommand(socket, map);
}

} catch (Exception e1) {
Expand Down
Expand Up @@ -15,11 +15,9 @@
import java.util.concurrent.Semaphore;

import org.java_websocket.WebSocket;
import org.locationtech.jts.util.Debug;

import msi.gama.common.interfaces.IGui;
import msi.gama.headless.core.GamaHeadlessException;
import msi.gama.headless.core.GamaServerMessage;
import msi.gama.headless.core.GamaServerMessageType;
import msi.gama.headless.job.ManualExperimentJob;
import msi.gama.kernel.experiment.ExperimentAgent;
Expand Down
Expand Up @@ -29,7 +29,7 @@ public CommandResponse execute(final WebSocket socket, IMap<String, Object> map)
for (int i = 0 ; i < nb_step ; i++) {
try {
if (sync) {
gama_exp.controller._job.doStep();
gama_exp.doStep();
} else {
gama_exp.controller.userStep();
}
Expand Down
19 changes: 16 additions & 3 deletions msi.gama.headless/src/msi/gama/headless/runtime/Application.java
Expand Up @@ -83,6 +83,8 @@ public class Application implements IApplication {

/** The Constant THREAD_PARAMETER. */
final public static String THREAD_PARAMETER = "-hpc";

final public static String PING_INTERVAL = "-ping_interval";

/** The Constant SOCKET_PARAMETER. */
final public static String SOCKET_PARAMETER = "-socket";
Expand Down Expand Up @@ -133,6 +135,9 @@ public class Application implements IApplication {
/** The socket. */
public int socket = -1;

// the interval between each ping sent by the server, -1 to deactivate this behaviour
public int ping_interval = 10000;

/** The console mode. */
public boolean consoleMode = false;

Expand Down Expand Up @@ -168,9 +173,13 @@ private static void showHelp() {
+ " -- start the console to write xml parameter file" + "\n\t\t"
+ VERBOSE_PARAMETER + " -- verbose mode" + "\n\t\t" + THREAD_PARAMETER
+ " [core] -- set the number of core available for experimentation" + "\n\t\t"
+ SOCKET_PARAMETER + " [socketPort] -- start socket pipeline to interact with another framework"
+ SOCKET_PARAMETER + " [socketPort] -- starts socket pipeline to interact with another framework"
+ "\n\t\t" + TUNNELING_PARAMETER
+ " -- start pipeline to interact with another framework"
+ "\n\t\t" + PING_INTERVAL + " [pingInterval] \t\t "
+ "-- when in server mode (socket parameter set), defines in milliseconds the time "
+ "between each ping packet sent to clients to keep alive the connection. "
+ "The default value is 10000, set to -1 to deactivate this behaviour."
+ "\n\t=== Infos ===" + "\n\t\t" + HELP_PARAMETER
+ " -- get the help of the command line" + "\n\t\t" + GAMA_VERSION
+ " -- get the the version of gama" + "\n\t=== Library Runner ===" + "\n\t\t"
Expand Down Expand Up @@ -234,6 +243,10 @@ private boolean checkParameters(final List<String> args) {
mustContainOutFolder = mustContainInFile = false;
this.socket = Integer.parseInt(after(args, SSOCKET_PARAMETER));
}
if (args.contains(PING_INTERVAL)) {
size = size - 2;
this.ping_interval = Integer.parseInt(after(args, PING_INTERVAL));
}
if (args.contains(THREAD_PARAMETER)) {
size = size - 2;
processorQueue.setNumberOfThreads(Integer.parseInt(after(args, THREAD_PARAMETER)));
Expand Down Expand Up @@ -363,13 +376,13 @@ public Object start(final IApplicationContext context) throws Exception {
buildXML(args);
} else if (args.contains(SOCKET_PARAMETER)) {
// GamaListener.newInstance(this.socket, this);
new GamaListener(this.socket, this, false, "", "", "");
new GamaListener(this.socket, this, false, "", "", "", this.ping_interval);
} else if (args.contains(SSOCKET_PARAMETER)) {
final String jks = args.contains(SSOCKET_PARAMETER_JKSPATH) ? after(args, SSOCKET_PARAMETER_JKSPATH) : "";
final String spwd = args.contains(SSOCKET_PARAMETER_SPWD) ? after(args, SSOCKET_PARAMETER_SPWD) : "";
final String kpwd = args.contains(SSOCKET_PARAMETER_KPWD) ? after(args, SSOCKET_PARAMETER_KPWD) : "";
// System.out.println(jks+" "+spwd+" "+kpwd);
new GamaListener(this.socket, this, true, jks, spwd, kpwd);
new GamaListener(this.socket, this, true, jks, spwd, kpwd, ping_interval);
} else {
runSimulation(args);
}
Expand Down

0 comments on commit e7abe5c

Please sign in to comment.