diff --git a/platform-controller/src/main/java/org/hobbit/controller/ExperimentManager.java b/platform-controller/src/main/java/org/hobbit/controller/ExperimentManager.java index f4e647b2..cfe46a46 100644 --- a/platform-controller/src/main/java/org/hobbit/controller/ExperimentManager.java +++ b/platform-controller/src/main/java/org/hobbit/controller/ExperimentManager.java @@ -312,15 +312,22 @@ protected void prefetchImages(BenchmarkMetaData benchmark, SystemMetaData system * {@link #experimentStatus} object and therefore blocking all other operations * on that object. * + * @param sessionId + * the experiment ID to which the result model belongs to * @param data * binary data containing a serialized RDF model * @param function * a deserialization function transforming the binary data into an * RDF model */ - public void setResultModel(byte[] data, Function function) { + public void setResultModel(String sessionId, byte[] data, Function function) { synchronized (experimentMutex) { - setResultModel_unsecured(function.apply(data)); + if ((experimentStatus != null) && (experimentStatus.config != null) + && (sessionId.equals(experimentStatus.config.id))) { + setResultModel_unsecured(function.apply(data)); + } else { + LOGGER.warn("Got result model for {} which is not running.", sessionId); + } } } @@ -523,25 +530,31 @@ public void notifyTermination(String containerId, int exitCode) { * true if the message was sent by the system, * false if the benchmark controller is ready */ - public void systemOrBenchmarkReady(boolean systemReportedReady) { + public void systemOrBenchmarkReady(boolean systemReportedReady, String sessionId) { synchronized (experimentMutex) { - // If there is an experiment waiting with the state INIT and if - // both - system and benchmark are ready - if ((experimentStatus != null) && (experimentStatus.setReadyAndCheck(systemReportedReady) - && (experimentStatus.getState() == ExperimentStatus.States.INIT))) { - try { - startBenchmark_unsecured(); - } catch (IOException e) { - // Let's retry this + if ((experimentStatus != null) && (experimentStatus.config != null) + && (sessionId.equals(experimentStatus.config.id))) { + // If there is an experiment waiting with the state INIT and if + // both - system and benchmark are ready + if ((experimentStatus != null) && (experimentStatus.setReadyAndCheck(systemReportedReady) + && (experimentStatus.getState() == ExperimentStatus.States.INIT))) { try { startBenchmark_unsecured(); - } catch (IOException e2) { - LOGGER.error("Couldn't sent start signal to the benchmark controller. Terminating experiment.", - e2); - // We have to terminate the experiment - forceBenchmarkTerminate_unsecured(HobbitErrors.UnexpectedError); + } catch (IOException e) { + // Let's retry this + try { + startBenchmark_unsecured(); + } catch (IOException e2) { + LOGGER.error( + "Couldn't sent start signal to the benchmark controller. Terminating experiment.", + e2); + // We have to terminate the experiment + forceBenchmarkTerminate_unsecured(HobbitErrors.UnexpectedError); + } } } + } else { + LOGGER.warn("Got a ready message for benchmark or system of {} which is not running.", sessionId); } } } @@ -610,10 +623,13 @@ public void addStatusInfo(ControllerStatus status, String userName) { * Changes the state of the internal experiment to * {@link ExperimentStatus.States#EVALUATION}. */ - public void taskGenFinished() { + public void taskGenFinished(String sessionId) { synchronized (experimentMutex) { - if (experimentStatus != null) { + if ((experimentStatus != null) && (experimentStatus.config != null) + && (sessionId.equals(experimentStatus.config.id))) { experimentStatus.setState(ExperimentStatus.States.EVALUATION); + } else { + LOGGER.warn("Got a taskGenFinished message of {} which is not running.", sessionId); } } } @@ -654,7 +670,8 @@ public void notifyExpRuntimeExpired(ExperimentStatus expiredState) { public void stopExperimentIfRunning(String experimentId) { synchronized (experimentMutex) { // If this is the currently running experiment - if ((experimentStatus != null) && (experimentStatus.config.id.equals(experimentId))) { + if ((experimentStatus != null) && (experimentStatus.config != null) + && (experimentStatus.config.id.equals(experimentId))) { // If the experiment hasn't been stopped if (experimentStatus.getState() != States.STOPPED) { LOGGER.error("The experiment {} was stopped by the user. Forcing termination.", @@ -670,4 +687,16 @@ public void close() throws IOException { expStartTimer.cancel(); } + public boolean isExpRunning(String sessionId) { + // copy the pointer to the experiment status to make sure that we can + // read it even if another thread sets the pointer to null. This gives + // us the possibility to read the status without acquiring the + // experimentMutex. + ExperimentStatus currentStatus = experimentStatus; + // Make sure that there is an experiment running with the given ID and that has + // not been already stopped + return (currentStatus != null) && (currentStatus.config != null) && (sessionId.equals(currentStatus.config.id)) + && (currentStatus.getState() != States.STOPPED); + } + } diff --git a/platform-controller/src/main/java/org/hobbit/controller/PlatformController.java b/platform-controller/src/main/java/org/hobbit/controller/PlatformController.java index 8cf9e949..527ab4d7 100644 --- a/platform-controller/src/main/java/org/hobbit/controller/PlatformController.java +++ b/platform-controller/src/main/java/org/hobbit/controller/PlatformController.java @@ -341,10 +341,17 @@ public void receiveCommand(byte command, byte[] data, String sessionId, String r // determine the command switch (command) { case Commands.DOCKER_CONTAINER_START: { - // Convert data byte array to config data structure - StartCommandData startParams = deserializeStartCommandData(data); - // trigger creation - String containerName = createContainer(startParams); + StartCommandData startParams = null; + String containerName = ""; + if (expManager.isExpRunning(sessionId)) { + // Convert data byte array to config data structure + startParams = deserializeStartCommandData(data); + // trigger creation + containerName = createContainer(startParams); + } else { + LOGGER.error( + "Got a request to start a container for experiment {} which is either not running or was already stopped. Returning null;"); + } if (replyTo != null) { try { cmdChannel.basicPublish("", replyTo, MessageProperties.PERSISTENT_BASIC, @@ -352,7 +359,9 @@ public void receiveCommand(byte command, byte[] data, String sessionId, String r } catch (IOException e) { StringBuilder errMsgBuilder = new StringBuilder(); errMsgBuilder.append("Error, couldn't sent response after creation of container ("); - errMsgBuilder.append(startParams.toString()); + if (startParams != null) { + errMsgBuilder.append(startParams.toString()); + } errMsgBuilder.append(") to replyTo="); errMsgBuilder.append(replyTo); errMsgBuilder.append("."); @@ -369,22 +378,22 @@ public void receiveCommand(byte command, byte[] data, String sessionId, String r break; } case Commands.BENCHMARK_READY_SIGNAL: { - expManager.systemOrBenchmarkReady(false); + expManager.systemOrBenchmarkReady(false, sessionId); break; } case Commands.SYSTEM_READY_SIGNAL: { - expManager.systemOrBenchmarkReady(true); + expManager.systemOrBenchmarkReady(true, sessionId); break; } case Commands.TASK_GENERATION_FINISHED: { - expManager.taskGenFinished(); + expManager.taskGenFinished(sessionId); break; } case Commands.BENCHMARK_FINISHED_SIGNAL: { if ((data == null) || (data.length == 0)) { LOGGER.error("Got no result model from the benchmark controller."); } else { - expManager.setResultModel(data, RabbitMQUtils::readModel); + expManager.setResultModel(sessionId, data, RabbitMQUtils::readModel); } break; } @@ -679,7 +688,7 @@ public void handleFrontEndCmd(byte bytes[], String replyTo, BasicProperties repl String userName = RabbitMQUtils.readString(buffer); // Get the experiment from the queue ExperimentConfiguration config = queue.getExperiment(experimentId); - if(config == null) { + if (config == null) { // The experiment is not known response = new byte[] { 1 }; }