Skip to content

Commit

Permalink
headless listener, some manangement of stocking experiments
Browse files Browse the repository at this point in the history
  • Loading branch information
hqnghi88 committed Mar 23, 2022
1 parent 61195d1 commit a33244b
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 49 deletions.
6 changes: 4 additions & 2 deletions msi.gama.headless/SimpleGUI.html
Expand Up @@ -332,6 +332,7 @@ <h3>Create a new <code>Dialog</code> Widget</h3>
}
// send message from the form
var exp_compiled;
var socket_id = "";
var exp_id = 0;
var exp_out = 0;
var exp_div_out = 0;
Expand Down Expand Up @@ -398,8 +399,9 @@ <h3>Create a new <code>Dialog</code> Widget</h3>
} else {
if (message.startsWith("exp@")) {
const myArray = message.split("@");
exp_id = myArray[1];
exp_out = myArray[2];
socket_id = myArray[1];
exp_id = myArray[2];
exp_out = myArray[3];
exp_div_out = $('.dialog_window').length + 1;
for (let i = 0; i < exp_out; i++) {
$('#new_window_title').val("Out" + i); $('#new_window_content').val('<div id="out' + i + '"><img width="500"></div>');
Expand Down
17 changes: 11 additions & 6 deletions msi.gama.headless/SimpleWebGIS.html
Expand Up @@ -114,8 +114,13 @@
zoom: 13 // starting zoom
});

var socket_id = "";
var exp_id = 0; var updateSource;
var socket = new WebSocket("ws://localhost:6868/launch");
var sk = new WebSocket("ws://localhost:6868/output");
sk.onclose = function (event) {
clearInterval(updateSource);
};
// socket.binaryType = "arraybuffer";
socket.addEventListener('open', (event) => {

Expand All @@ -124,17 +129,17 @@
socket.onmessage = function (event) {
exp_compiled = event.data;
if (exp_compiled.startsWith("exp@")) {
const myArray = exp_compiled.split("@");
// console.log(myArray[3], myArray[4]);
exp_id = myArray[1];
const myArray = exp_compiled.split("@");
socket_id = myArray[1];
exp_id = myArray[2];
map.flyTo({
center: [myArray[3], myArray[4]],
center: [myArray[4], myArray[5]],
essential: true,
zoom: 15
});
socket.send("play@" + exp_id);
const updateSource = setInterval(() => {
sk.send("output@" + exp_id+"@Individual");
updateSource = setInterval(() => {
sk.send("output@" + socket_id + "@" + exp_id + "@Individual");
sk.onmessage = function (event) {

let message = event.data;
Expand Down
2 changes: 1 addition & 1 deletion msi.gama.headless/msi.gama.headless.id4_full.launch
Expand Up @@ -19,7 +19,7 @@
<booleanAttribute key="org.eclipse.jdt.launching.ATTR_SHOW_CODEDETAILS_IN_EXCEPTION_MESSAGES" value="true"/>
<booleanAttribute key="org.eclipse.jdt.launching.ATTR_USE_START_ON_FIRST_THREAD" value="false"/>
<stringAttribute key="org.eclipse.jdt.launching.JAVA_COMMAND" value="java"/>
<stringAttribute key="org.eclipse.jdt.launching.JRE_CONTAINER" value="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/jdk-17.0.1"/>
<stringAttribute key="org.eclipse.jdt.launching.JRE_CONTAINER" value="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/jdk-17.0.2"/>
<stringAttribute key="org.eclipse.jdt.launching.PROGRAM_ARGUMENTS" value="-os ${target.os} -ws ${target.ws} -arch ${target.arch} -nl ${target.nl} -socket 6868"/>
<stringAttribute key="org.eclipse.jdt.launching.SOURCE_PATH_PROVIDER" value="org.eclipse.pde.ui.workbenchClasspathProvider"/>
<booleanAttribute key="pde.generated.config" value="false"/>
Expand Down
Expand Up @@ -18,27 +18,23 @@

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer;

import msi.gama.headless.common.Globals;
import msi.gama.headless.job.ExperimentJob;
import msi.gama.headless.job.ManualExperimentJob;

/**
* A simple WebSocketServer implementation. Keeps track of a "chatroom".
Expand All @@ -59,7 +55,7 @@ public class GamaWebSocketServer extends WebSocketServer {
/** The instance. */
private static GamaWebSocketServer instance;
/** The simulations. */
final Map<String, ExperimentJob> simulations = new HashMap<>();
final private ConcurrentHashMap<String, ConcurrentHashMap<String, ExperimentJob>> launched_experiments = new ConcurrentHashMap<String, ConcurrentHashMap<String, ExperimentJob>>();
private static WebSocketPrintStream bufferStream;

public GamaWebSocketServer(int port, Application a) {
Expand All @@ -83,7 +79,7 @@ public GamaWebSocketServer(int port, Application a) {
}

void deleteFolder(File file) {
if(file.listFiles()!=null) {
if (file.listFiles() != null) {
for (File subFile : file.listFiles()) {
if (subFile.isDirectory()) {
deleteFolder(subFile);
Expand Down Expand Up @@ -157,9 +153,28 @@ public void onOpen(WebSocket conn, ClientHandshake handshake) {
}
}

public ConcurrentHashMap<String, ConcurrentHashMap<String, ExperimentJob>> getAllExperiments() {
return launched_experiments;
}

public ConcurrentHashMap<String, ExperimentJob> getExperimentsOf(final String socket) {
return launched_experiments.get(socket);
}

public ExperimentJob getExperiment(final String socket, final String expid) {
return launched_experiments.get(socket).get(expid);
}

@Override
public void onClose(WebSocket conn, int code, String reason, boolean remote) {
// simulations.clear();
if(launched_experiments.get(""+conn.hashCode())!=null) {
for (ExperimentJob e : launched_experiments.get(""+conn.hashCode()).values()) {
((ManualExperimentJob) e).paused = true;

e.dispose();
}
launched_experiments.get(""+conn.hashCode()).clear();
}
broadcast(conn + " has left the room!");
System.out.println(conn + " has left the room!");
}
Expand Down
63 changes: 38 additions & 25 deletions msi.gama.headless/src/msi/gama/headless/runtime/LaunchEndPoint.java
Expand Up @@ -20,6 +20,7 @@
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

import org.java_websocket.WebSocket;

Expand Down Expand Up @@ -126,7 +127,7 @@ public void launchGamlSimulation(final GamaWebSocketServer server, WebSocket soc
}
final String argExperimentName = args.get(args.size() - 2);
// final String argGamlFile = args.get(args.size() - 1);
if(!ff.exists())
if (!ff.exists())
return;
final List<IExperimentJob> jb = ExperimentationPlanFactory.buildExperiment(ff.getAbsoluteFile().toString());
ManualExperimentJob selectedJob = null;
Expand All @@ -146,7 +147,13 @@ public void launchGamlSimulation(final GamaWebSocketServer server, WebSocket soc
| GamaHeadlessException e) {
e.printStackTrace();
}
server.simulations.put(selectedJob.getExperimentID(), selectedJob);
if (server.getExperimentsOf(""+socket.hashCode()) == null) {
final ConcurrentHashMap<String, ExperimentJob> exps = new ConcurrentHashMap<String, ExperimentJob>();
server.getAllExperiments().put(""+socket.hashCode(), exps);

}
server.getExperimentsOf(""+socket.hashCode()).put(selectedJob.getExperimentID(), selectedJob);

final int size = selectedJob.getListenedVariables().length;
// String lst_out = "";
// if (size != 0) {
Expand All @@ -158,16 +165,18 @@ public void launchGamlSimulation(final GamaWebSocketServer server, WebSocket soc
IAgent agt = selectedJob.getSimulation().getSimulation();

IShape geom = Spatial.Projections.transform_CRS(agt.getScope(), agt.getGeometry(), "EPSG:4326");

socket.send("exp@" + selectedJob.getExperimentID() + "@" + size + "@" + geom.getLocation().x + "@"
+ geom.getLocation().y);
System.out.println("exp@" + ""+socket.hashCode() + "@" + selectedJob.getExperimentID() + "@" + size + "@"
+ geom.getLocation().x + "@" + geom.getLocation().y);
socket.send("exp@" + ""+socket.hashCode() + "@" + selectedJob.getExperimentID() + "@" + size + "@"
+ geom.getLocation().x + "@" + geom.getLocation().y);
((ManualExperimentJob) selectedJob).exportVariables();
// server.getDefaultApp().processorQueue.pushSimulation(selectedJob);
}

@Override
public void onMessage(final GamaWebSocketServer server, final WebSocket socket, final String message) {
// server.broadcast(message);
final String socket_id = ""+socket.hashCode();
System.out.println(socket + ": " + message);
String[] args = message.split("@");
if ("launch".equals(args[0])) {
Expand All @@ -182,51 +191,55 @@ public void onMessage(final GamaWebSocketServer server, final WebSocket socket,
if ("play".equals(args[0])) {
String id_exp = args[1];
System.out.println("play " + id_exp);
if (server.simulations.get(id_exp) != null && server.simulations.get(id_exp).getSimulation() != null) {
((ManualExperimentJob) server.simulations.get(id_exp)).paused = false;
((ManualExperimentJob) server.simulations.get(id_exp)).stepping = false;
if (server.getExperiment(socket_id, id_exp) != null
&& server.getExperiment(socket_id, id_exp).getSimulation() != null) {
((ManualExperimentJob) server.getExperiment(socket_id, id_exp)).paused = false;
((ManualExperimentJob) server.getExperiment(socket_id, id_exp)).stepping = false;
// DEBUG.TIMER("Simulation duration", () -> {
if (((ManualExperimentJob) server.simulations.get(id_exp)).internalThread == null) {
((ManualExperimentJob) server.simulations.get(id_exp)).internalThread = new Thread() {
if (((ManualExperimentJob) server.getExperiment(socket_id, id_exp)).internalThread == null) {
((ManualExperimentJob) server.getExperiment(socket_id, id_exp)).internalThread = new Thread() {
@Override
public void run() {

while (!server.simulations.get(id_exp).getSimulation().isInterrupted()) {
if (((ManualExperimentJob) server.simulations.get(id_exp)).stepping) {
((ManualExperimentJob) server.simulations.get(id_exp)).stepping = false;
((ManualExperimentJob) server.simulations.get(id_exp)).paused = true;
server.simulations.get(id_exp).doStep();
while (!server.getExperiment(socket_id, id_exp).getSimulation().isInterrupted()) {
if (((ManualExperimentJob) server.getExperiment(socket_id, id_exp)).stepping) {
((ManualExperimentJob) server.getExperiment(socket_id, id_exp)).stepping = false;
((ManualExperimentJob) server.getExperiment(socket_id, id_exp)).paused = true;
server.getExperiment(socket_id, id_exp).doStep();
}
if (!((ManualExperimentJob) server.simulations.get(id_exp)).paused)
server.simulations.get(id_exp).doStep();
if (!((ManualExperimentJob) server.getExperiment(socket_id, id_exp)).paused)
server.getExperiment(socket_id, id_exp).doStep();
}
}
};
((ManualExperimentJob) server.simulations.get(id_exp)).internalThread.start();
((ManualExperimentJob) server.getExperiment(socket_id, id_exp)).internalThread.start();
}
}
}

if ("step".equals(args[0])) {
String id_exp = args[1];
System.out.println("step " + id_exp);
if (server.simulations.get(id_exp) != null && server.simulations.get(id_exp).getSimulation() != null) {
((ManualExperimentJob) server.simulations.get(id_exp)).stepping = true;
if (server.getExperiment(socket_id, id_exp) != null
&& server.getExperiment(socket_id, id_exp).getSimulation() != null) {
((ManualExperimentJob) server.getExperiment(socket_id, id_exp)).stepping = true;
}
}
if ("pause".equals(args[0])) {
String id_exp = args[1];
System.out.println("pause " + id_exp);
if (server.simulations.get(id_exp) != null && server.simulations.get(id_exp).getSimulation() != null) {
((ManualExperimentJob) server.simulations.get(id_exp)).paused = true;
if (server.getExperiment(socket_id, id_exp) != null
&& server.getExperiment(socket_id, id_exp).getSimulation() != null) {
((ManualExperimentJob) server.getExperiment(socket_id, id_exp)).paused = true;
}
}
if ("stop".equals(args[0])) {
String id_exp = args[1];
System.out.println("stop " + id_exp);
if (server.simulations.get(id_exp) != null && server.simulations.get(id_exp).getSimulation() != null) {
((ManualExperimentJob) server.simulations.get(id_exp)).paused = true;
((ManualExperimentJob) server.simulations.get(id_exp)).dispose();
if (server.getExperiment(socket_id, id_exp) != null
&& server.getExperiment(socket_id, id_exp).getSimulation() != null) {
((ManualExperimentJob) server.getExperiment(socket_id, id_exp)).paused = true;
((ManualExperimentJob) server.getExperiment(socket_id, id_exp)).dispose();
}
}
if ("exit".equals(args[0])) {
Expand Down
Expand Up @@ -29,13 +29,14 @@ public void onOpen(WebSocket socket) {
public void onMessage(GamaWebSocketServer server, WebSocket socket, String message) {
// socket.send(message);

System.out.println(socket + ": " + message);
String[] args = message.split("@");
if ("output".equals(args[0])) {
String id_exp = args[1];
if (server.simulations.get(id_exp) != null && server.simulations.get(id_exp).getSimulation() != null) {
IList<? extends IShape> agents = server.simulations.get(id_exp).getSimulation().getSimulation()
.getMicroPopulation(args[2]);
final String socket_id=args[1];
System.out.println(socket_id + ": " + message);
final String id_exp = args[2];
if (server.getExperiment(socket_id,id_exp) != null && server.getExperiment(socket_id,id_exp).getSimulation() != null) {
IList<? extends IShape> agents = server.getExperiment(socket_id,id_exp).getSimulation().getSimulation()
.getMicroPopulation(args[3]);
// IList<? extends IShape> agents=GamaListFactory.create();
// for(IPopulation pop:simulator.getSimulation().getMicroPopulations()) {
// if(!(pop instanceof GridPopulation)) {
Expand All @@ -44,7 +45,7 @@ public void onMessage(GamaWebSocketServer server, WebSocket socket, String messa
// }
try {
socket.send(SaveHelper.buildGeoJSon(
server.simulations.get(id_exp).getSimulation().getSimulation().getScope(), agents));
server.getExperiment(socket_id,id_exp).getSimulation().getSimulation().getScope(), agents));
} catch (GamaRuntimeException | IOException | SchemaException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
Expand Down

0 comments on commit a33244b

Please sign in to comment.