Skip to content

Commit

Permalink
Adapted the ExperimentManager and PlatformController to check the ses…
Browse files Browse the repository at this point in the history
…sionId of requests that have influence on the experiment status.
  • Loading branch information
MichaelRoeder committed Jun 21, 2018
1 parent 5054c74 commit e1cf351
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -312,15 +312,22 @@ protected void prefetchImages(BenchmarkMetaData benchmark, SystemMetaData system
* {@link #experimentStatus} object and therefore blocking all other operations
* on that object.
*
* @param sessionId
* the experiment ID to which the result model belongs to
* @param data
* binary data containing a serialized RDF model
* @param function
* a deserialization function transforming the binary data into an
* RDF model
*/
public void setResultModel(byte[] data, Function<? super byte[], ? extends Model> function) {
public void setResultModel(String sessionId, byte[] data, Function<? super byte[], ? extends Model> function) {
synchronized (experimentMutex) {
setResultModel_unsecured(function.apply(data));
if ((experimentStatus != null) && (experimentStatus.config != null)
&& (sessionId.equals(experimentStatus.config.id))) {
setResultModel_unsecured(function.apply(data));
} else {
LOGGER.warn("Got result model for {} which is not running.", sessionId);
}
}
}

Expand Down Expand Up @@ -523,25 +530,31 @@ public void notifyTermination(String containerId, int exitCode) {
* <code>true</code> if the message was sent by the system,
* <code>false</code> if the benchmark controller is ready
*/
public void systemOrBenchmarkReady(boolean systemReportedReady) {
public void systemOrBenchmarkReady(boolean systemReportedReady, String sessionId) {
synchronized (experimentMutex) {
// If there is an experiment waiting with the state INIT and if
// both - system and benchmark are ready
if ((experimentStatus != null) && (experimentStatus.setReadyAndCheck(systemReportedReady)
&& (experimentStatus.getState() == ExperimentStatus.States.INIT))) {
try {
startBenchmark_unsecured();
} catch (IOException e) {
// Let's retry this
if ((experimentStatus != null) && (experimentStatus.config != null)
&& (sessionId.equals(experimentStatus.config.id))) {
// If there is an experiment waiting with the state INIT and if
// both - system and benchmark are ready
if ((experimentStatus != null) && (experimentStatus.setReadyAndCheck(systemReportedReady)
&& (experimentStatus.getState() == ExperimentStatus.States.INIT))) {
try {
startBenchmark_unsecured();
} catch (IOException e2) {
LOGGER.error("Couldn't sent start signal to the benchmark controller. Terminating experiment.",
e2);
// We have to terminate the experiment
forceBenchmarkTerminate_unsecured(HobbitErrors.UnexpectedError);
} catch (IOException e) {
// Let's retry this
try {
startBenchmark_unsecured();
} catch (IOException e2) {
LOGGER.error(
"Couldn't sent start signal to the benchmark controller. Terminating experiment.",
e2);
// We have to terminate the experiment
forceBenchmarkTerminate_unsecured(HobbitErrors.UnexpectedError);
}
}
}
} else {
LOGGER.warn("Got a ready message for benchmark or system of {} which is not running.", sessionId);
}
}
}
Expand Down Expand Up @@ -610,10 +623,13 @@ public void addStatusInfo(ControllerStatus status, String userName) {
* Changes the state of the internal experiment to
* {@link ExperimentStatus.States#EVALUATION}.
*/
public void taskGenFinished() {
public void taskGenFinished(String sessionId) {
synchronized (experimentMutex) {
if (experimentStatus != null) {
if ((experimentStatus != null) && (experimentStatus.config != null)
&& (sessionId.equals(experimentStatus.config.id))) {
experimentStatus.setState(ExperimentStatus.States.EVALUATION);
} else {
LOGGER.warn("Got a taskGenFinished message of {} which is not running.", sessionId);
}
}
}
Expand Down Expand Up @@ -654,7 +670,8 @@ public void notifyExpRuntimeExpired(ExperimentStatus expiredState) {
public void stopExperimentIfRunning(String experimentId) {
synchronized (experimentMutex) {
// If this is the currently running experiment
if ((experimentStatus != null) && (experimentStatus.config.id.equals(experimentId))) {
if ((experimentStatus != null) && (experimentStatus.config != null)
&& (experimentStatus.config.id.equals(experimentId))) {
// If the experiment hasn't been stopped
if (experimentStatus.getState() != States.STOPPED) {
LOGGER.error("The experiment {} was stopped by the user. Forcing termination.",
Expand All @@ -670,4 +687,16 @@ public void close() throws IOException {
expStartTimer.cancel();
}

public boolean isExpRunning(String sessionId) {
// copy the pointer to the experiment status to make sure that we can
// read it even if another thread sets the pointer to null. This gives
// us the possibility to read the status without acquiring the
// experimentMutex.
ExperimentStatus currentStatus = experimentStatus;
// Make sure that there is an experiment running with the given ID and that has
// not been already stopped
return (currentStatus != null) && (currentStatus.config != null) && (sessionId.equals(currentStatus.config.id))
&& (currentStatus.getState() != States.STOPPED);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -341,18 +341,27 @@ public void receiveCommand(byte command, byte[] data, String sessionId, String r
// determine the command
switch (command) {
case Commands.DOCKER_CONTAINER_START: {
// Convert data byte array to config data structure
StartCommandData startParams = deserializeStartCommandData(data);
// trigger creation
String containerName = createContainer(startParams);
StartCommandData startParams = null;
String containerName = "";
if (expManager.isExpRunning(sessionId)) {
// Convert data byte array to config data structure
startParams = deserializeStartCommandData(data);
// trigger creation
containerName = createContainer(startParams);
} else {
LOGGER.error(
"Got a request to start a container for experiment {} which is either not running or was already stopped. Returning null;");
}
if (replyTo != null) {
try {
cmdChannel.basicPublish("", replyTo, MessageProperties.PERSISTENT_BASIC,
RabbitMQUtils.writeString(containerName));
} catch (IOException e) {
StringBuilder errMsgBuilder = new StringBuilder();
errMsgBuilder.append("Error, couldn't sent response after creation of container (");
errMsgBuilder.append(startParams.toString());
if (startParams != null) {
errMsgBuilder.append(startParams.toString());
}
errMsgBuilder.append(") to replyTo=");
errMsgBuilder.append(replyTo);
errMsgBuilder.append(".");
Expand All @@ -369,22 +378,22 @@ public void receiveCommand(byte command, byte[] data, String sessionId, String r
break;
}
case Commands.BENCHMARK_READY_SIGNAL: {
expManager.systemOrBenchmarkReady(false);
expManager.systemOrBenchmarkReady(false, sessionId);
break;
}
case Commands.SYSTEM_READY_SIGNAL: {
expManager.systemOrBenchmarkReady(true);
expManager.systemOrBenchmarkReady(true, sessionId);
break;
}
case Commands.TASK_GENERATION_FINISHED: {
expManager.taskGenFinished();
expManager.taskGenFinished(sessionId);
break;
}
case Commands.BENCHMARK_FINISHED_SIGNAL: {
if ((data == null) || (data.length == 0)) {
LOGGER.error("Got no result model from the benchmark controller.");
} else {
expManager.setResultModel(data, RabbitMQUtils::readModel);
expManager.setResultModel(sessionId, data, RabbitMQUtils::readModel);
}
break;
}
Expand Down Expand Up @@ -679,7 +688,7 @@ public void handleFrontEndCmd(byte bytes[], String replyTo, BasicProperties repl
String userName = RabbitMQUtils.readString(buffer);
// Get the experiment from the queue
ExperimentConfiguration config = queue.getExperiment(experimentId);
if(config == null) {
if (config == null) {
// The experiment is not known
response = new byte[] { 1 };
}
Expand Down

0 comments on commit e1cf351

Please sign in to comment.