Skip to content

Commit

Permalink
More use of lambdas and streams
Browse files Browse the repository at this point in the history
Also got rid of an interface that is never implemented
  • Loading branch information
dkfellows committed May 10, 2022
1 parent 79dad23 commit a918e4c
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 265 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@
import uk.ac.manchester.cs.spinnaker.job.nmpi.QueueEmpty;
import uk.ac.manchester.cs.spinnaker.job.nmpi.QueueNextResponse;
import uk.ac.manchester.cs.spinnaker.rest.NMPIQueue;
import uk.ac.manchester.cs.spinnaker.rest.utils.CustomJacksonJsonProvider;
import uk.ac.manchester.cs.spinnaker.rest.utils.PropertyBasedDeserialiser;

/**
* Testing.
*
*/
public final class TestRestClient {

Expand All @@ -48,34 +45,13 @@ private TestRestClient() {
* @throws Exception if anything goes wrong
*/
public static void main(final String[] args) throws Exception {
final CustomJacksonJsonProvider provider =
new CustomJacksonJsonProvider();

/**
* How to understand messages coming from the queue.
*/
@SuppressWarnings("serial")
class QueueResponseDeserialiser
extends PropertyBasedDeserialiser<QueueNextResponse> {
/**
* Make a deserialiser.
*/
QueueResponseDeserialiser() {
super(QueueNextResponse.class);
register("id", Job.class);
register("warning", QueueEmpty.class);
}
}
provider.addDeserialiser(QueueNextResponse.class,
new QueueResponseDeserialiser());

URL nmpiUrl = new URL("https://nmpi.hbpneuromorphic.eu/");
String nmpiUsername = "uman";
String apiKey = "QWvPf6WzISelx7MhIJqzoi-BgZqj95PPJYnpBuLTKcGN5b8sbP9"
+ "fiUR2UQ6I--PHuoeOIeF0tmKptKC5rbIMRiRlGGG51zDvRDzqoIVTm4LU6L"
+ "fV8MXYRlzXi4Dc75w-";
NMPIQueue queue = createApiKeyClient(nmpiUrl, nmpiUsername, apiKey,
NMPIQueue.class, provider);
NMPIQueue.class, NMPIQueue.createProvider());
QueueNextResponse response = queue.getNextJob("SpiNNaker");
if (response instanceof QueueEmpty) {
System.err.println("No items in queue");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
* @see XenVMExecuterFactory.Executer
*/
public interface JobExecuter {

/**
* Gets the id of the executer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,23 @@
import static java.io.File.createTempFile;
import static java.lang.Math.ceil;
import static java.util.Arrays.asList;
import static java.util.Comparator.comparing;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static java.util.stream.Collectors.toList;
import static javax.ws.rs.core.Response.ok;
import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.MediaType.APPLICATION_OCTET_STREAM;
import static org.apache.commons.io.FileUtils.copyInputStreamToFile;
import static org.apache.commons.io.FileUtils.forceDelete;
import static org.apache.commons.io.FileUtils.forceMkdir;
import static org.apache.commons.io.FileUtils.forceMkdirParent;
import static org.apache.commons.io.FileUtils.listFiles;
import static org.slf4j.LoggerFactory.getLogger;
import static uk.ac.manchester.cs.spinnaker.nmpi.NMPIQueueManager.STATUS_QUEUED;
import static uk.ac.manchester.cs.spinnaker.nmpi.NMPIQueueManager.STATUS_RUNNING;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -241,7 +247,7 @@ private void startManager() {
@PreDestroy
private void stopManager() {
scheduler.shutdown();
queueManager.close(); // Stops the worker thread eventually
queueManager.close(); // Stops the queue manager thread eventually
}

@Override
Expand Down Expand Up @@ -305,15 +311,9 @@ public Job getNextJob(final String executerId) {
public SpinnakerMachine getLargestJobMachine(final int id,
final double runTime) {
// TODO Check quota to get the largest machine within the quota

SpinnakerMachine largest = null;
for (final SpinnakerMachine machine : machineManager.getMachines()) {
if ((largest == null) || (machine.getArea() > largest.getArea())) {
largest = machine;
}
}

return largest;
return machineManager.getMachines().stream()
.max(comparing(SpinnakerMachine::getArea))
.orElse(null);
}

@Override
Expand Down Expand Up @@ -402,7 +402,8 @@ private SpinnakerMachine findMachine(final int id,
}
}
throw new WebApplicationException(
"Machine " + machineName + " does not exist for job " + id);
"Machine " + machineName + " does not exist for job " + id,
BAD_REQUEST);
}

@Override
Expand Down Expand Up @@ -479,23 +480,16 @@ public JobMachineAllocated checkMachineLease(final int id,
final List<SpinnakerMachine> machines = getMachineForJob(id);

// Return false if any machine is gone
for (final SpinnakerMachine machine : machines) {
if (!machineManager.isMachineAvailable(machine)) {
return new JobMachineAllocated(false);
}
if (!machines.stream().allMatch(machineManager::isMachineAvailable)) {
return new JobMachineAllocated(false);
}

// Wait for the state change of any machine
waitForAnyMachineStateChange(waitTime, machines);

// Again check for a machine which is gone
for (final SpinnakerMachine machine : machines) {
if (!machineManager.isMachineAvailable(machine)) {
return new JobMachineAllocated(false);
}
}

return new JobMachineAllocated(true);
return new JobMachineAllocated(machines.stream()
.allMatch(machineManager::isMachineAvailable));
}

/**
Expand Down Expand Up @@ -576,10 +570,8 @@ private List<DataItem> getOutputFiles(final String projectId, final int id,
throws IOException {
final List<DataItem> outputItems = new ArrayList<>();
if (outputs != null) {
final List<File> outputFiles = new ArrayList<>();
for (final String filename : outputs) {
outputFiles.add(new File(filename));
}
final List<File> outputFiles =
outputs.stream().map(File::new).collect(toList());
outputItems.addAll(outputManager.addOutputs(projectId, id,
new File(baseFile), outputFiles));
}
Expand Down Expand Up @@ -697,9 +689,7 @@ private boolean releaseAllocatedMachines(final int id) {
final List<SpinnakerMachine> machines =
allocatedMachines.remove(id);
if (machines != null) {
for (final SpinnakerMachine machine : machines) {
machineManager.releaseMachine(machine);
}
machines.forEach(machineManager::releaseMachine);
}
return machines != null;
}
Expand Down Expand Up @@ -735,12 +725,6 @@ public void setJobError(final String projectId, final int id,
}
}

/**
* An empty stack trace element.
*/
private static final StackTraceElement[] STE_TMPL =
new StackTraceElement[0];

/**
* Convert a remote exception to a local one.
*
Expand All @@ -750,13 +734,10 @@ public void setJobError(final String projectId, final int id,
*/
private Exception reconstructRemoteException(final String error,
final RemoteStackTrace stackTrace) {
final ArrayList<StackTraceElement> elements = new ArrayList<>();
for (final RemoteStackTraceElement element : stackTrace.getElements()) {
elements.add(element.toSTE());
}

final Exception exception = new Exception(error);
exception.setStackTrace(elements.toArray(STE_TMPL));
exception.setStackTrace(stackTrace.getElements().stream()
.map(RemoteStackTraceElement::toSTE)
.toArray(StackTraceElement[]::new));
return exception;
}

Expand All @@ -779,16 +760,16 @@ public void setExecutorExited(final String executorId,
final int id = job.getId();
logger.debug("Executer {} for Job {} has exited", executorId, id);

String status = job.getStatus();
if (status == NMPIQueueManager.STATUS_QUEUED
|| status == NMPIQueueManager.STATUS_RUNNING) {
switch (job.getStatus()) {
case STATUS_QUEUED:
case STATUS_RUNNING:
logger.debug("Job {} has not exited cleanly", id);
releaseAllocatedMachines(id);
final long resourceUsage = getResourceUsage(id);
final ObjectNode prov = getProvenance(id);
try {
final String projectId =
new File(job.getCollabId()).getName();
new File(job.getCollabId()).getName();
queueManager.setJobError(id, logToAppend,
getOutputFiles(projectId, id, null, null),
new Exception("Job did not finish cleanly"),
Expand Down Expand Up @@ -820,15 +801,13 @@ public Response getJobProcessManager() {
throw new UnsatisfiedLinkError(
JOB_PROCESS_MANAGER_ZIP + " not found in classpath");
}
return Response.ok(jobManagerStream).type(APPLICATION_ZIP).build();
return ok(jobManagerStream).type(APPLICATION_ZIP).build();
}

@Override
public Response getSetupScript() throws IOException {
final InputStream setupScriptStream =
setupScript.getInputStream();
return Response.ok(setupScriptStream).type(
APPLICATION_OCTET_STREAM).build();
return ok(setupScript.getInputStream())
.type(APPLICATION_OCTET_STREAM).build();
}

/**
Expand All @@ -838,9 +817,8 @@ private void updateStatus() {
int nBoardsInUse = 0;
synchronized (allocatedMachines) {
for (List<SpinnakerMachine> machines: allocatedMachines.values()) {
for (SpinnakerMachine machine : machines) {
nBoardsInUse += machine.getnBoards();
}
nBoardsInUse += machines.stream()
.mapToInt(SpinnakerMachine::getnBoards).sum();
}
}
statusMonitorManager.updateStatus(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static java.io.File.createTempFile;
import static java.io.File.pathSeparator;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
import static org.apache.commons.io.FileUtils.copyToFile;
import static org.apache.commons.io.FileUtils.forceDeleteOnExit;
import static org.apache.commons.io.FileUtils.forceMkdirParent;
Expand All @@ -33,6 +34,7 @@
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -42,6 +44,7 @@

import javax.annotation.PostConstruct;

import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Value;

Expand Down Expand Up @@ -184,7 +187,7 @@ public JobExecuter createJobExecuter(final JobManager manager,
/**
* The executer thread.
*/
class Executer implements JobExecuter {
protected class Executer implements JobExecuter {

/**
* The job manager to report to.
Expand Down Expand Up @@ -276,15 +279,11 @@ private List<String> constructArguments() {
final List<String> command = new ArrayList<>();
command.add(javaExec.getAbsolutePath());

final StringBuilder classPathBuilder = new StringBuilder();
String separator = "";
for (final File file : jobProcessManagerClasspath) {
classPathBuilder.append(separator).append(file);
separator = pathSeparator;
}
String classPath = jobProcessManagerClasspath.stream()
.map(File::toString).collect(joining(pathSeparator));
command.add("-cp");
command.add(classPathBuilder.toString());
logger.debug("Classpath: {}", classPathBuilder);
command.add(classPath);
logger.debug("Classpath: {}", classPath);

command.add(JOB_PROCESS_MANAGER_MAIN_CLASS);
logger.debug("Main command: {}", JOB_PROCESS_MANAGER_MAIN_CLASS);
Expand Down Expand Up @@ -330,20 +329,13 @@ private JobOutputPipe startSubprocess(final List<String> command) {
* Report the results of the job using the log.
*/
private void reportResult() {
final StringBuilder logToAppend = new StringBuilder();
try (BufferedReader reader =
new BufferedReader(new FileReader(outputLog))) {
while (true) {
final String line = reader.readLine();
if (line == null) {
break;
}
logToAppend.append(line).append("\n");
}
StringWriter loggedOutput = new StringWriter();
try (FileReader reader = new FileReader(outputLog)) {
IOUtils.copy(reader, loggedOutput);
} catch (final IOException e) {
logger.warn("problem in reporting log", e);
}
jobManager.setExecutorExited(id, logToAppend.toString());
jobManager.setExecutorExited(id, loggedOutput.toString());
}

/**
Expand All @@ -353,7 +345,7 @@ private void reportResult() {
* @throws IOException If the output stream of the process can't be
* obtained
*/
public OutputStream getProcessOutputStream() throws IOException {
OutputStream getProcessOutputStream() throws IOException {
synchronized (this) {
while ((process == null) && (startException == null)) {
try {
Expand All @@ -374,7 +366,7 @@ public OutputStream getProcessOutputStream() throws IOException {
*
* @return The location of the log file
*/
public File getLogFile() {
File getLogFile() {
return outputLog;
}
}
Expand Down Expand Up @@ -416,19 +408,19 @@ class JobOutputPipe extends Thread implements AutoCloseable {
setDaemon(true);
}

private String readLine() {
try {
return reader.readLine();
} catch (final IOException e) {
return null;
}
}

@Override
public void run() {
try {
while (!done) {
String line;
try {
line = reader.readLine();
} catch (final IOException e) {
break;
}
if (line == null) {
break;
}
String line;
while (!done && (line = readLine()) != null) {
if (!line.isEmpty()) {
logger.debug("{}", line);
writer.println(line);
Expand Down
Loading

0 comments on commit a918e4c

Please sign in to comment.