Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client: have just one event queue and one consuming thread #198

Merged
merged 4 commits into from Nov 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -17,6 +17,8 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
Expand All @@ -30,6 +32,7 @@
import org.jboss.fuse.mvnd.common.DaemonException.StaleAddressException;
import org.jboss.fuse.mvnd.common.DaemonInfo;
import org.jboss.fuse.mvnd.common.Message;
import org.jboss.fuse.mvnd.common.Message.Prompt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -85,19 +88,27 @@ public void dispatch(Message message) throws DaemonException.ConnectException {
}
}

public Message receive() throws ConnectException, StaleAddressException {
public List<Message> receive() throws ConnectException, StaleAddressException {
while (true) {
try {
Message m = queue.poll(maxKeepAliveMs, TimeUnit.MILLISECONDS);
final Message m = queue.poll(maxKeepAliveMs, TimeUnit.MILLISECONDS);
{
Exception e = exception.get();
if (e != null) {
throw e;
} else if (m == null) {
throw new IOException("No message received within " + maxKeepAliveMs
+ "ms, daemon may have crashed. You may want to check its status using mvnd --status");
}
}
final List<Message> result = new ArrayList<>(4);
result.add(m);
queue.drainTo(result);
Exception e = exception.get();
if (e != null) {
throw e;
} else if (m != null) {
return m;
} else {
throw new IOException("No message received within " + maxKeepAliveMs
+ "ms, daemon may have crashed. You may want to check its status using mvnd --status");
}
return result;
} catch (Exception e) {
DaemonDiagnostics diag = new DaemonDiagnostics(daemon.getUid(), parameters);
LOG.debug("Problem receiving message to the daemon. Performing 'on failure' operation...");
Expand All @@ -119,6 +130,10 @@ protected void doReceive() {
if (m == null) {
break;
}
if (m.getType() == Message.PROMPT) {
final Prompt prompt = (Prompt) m;
m = prompt.withCallback(response -> dispatch(prompt.response(response)));
}
queue.put(m);
}
} catch (Exception e) {
Expand Down
Expand Up @@ -41,6 +41,7 @@
import org.jboss.fuse.mvnd.common.DaemonStopEvent;
import org.jboss.fuse.mvnd.common.Environment;
import org.jboss.fuse.mvnd.common.MavenDaemon;
import org.jboss.fuse.mvnd.common.Message;
import org.jboss.fuse.mvnd.common.Os;
import org.jboss.fuse.mvnd.common.logging.ClientOutput;
import org.slf4j.Logger;
Expand Down Expand Up @@ -85,7 +86,7 @@ public DaemonClientConnection maybeConnect(DaemonInfo daemon) {
public DaemonClientConnection connect(ClientOutput output) {
final DaemonCompatibilitySpec constraint = new DaemonCompatibilitySpec(
parameters.javaHome(), parameters.getDaemonOpts());
output.buildStatus("Looking up daemon...");
output.accept(Message.buildStatus("Looking up daemon..."));
Map<Boolean, List<DaemonInfo>> idleBusy = registry.getAll().stream()
.collect(Collectors.groupingBy(di -> di.getState() == DaemonState.Idle));
final Collection<DaemonInfo> idleDaemons = idleBusy.getOrDefault(true, Collections.emptyList());
Expand All @@ -105,7 +106,7 @@ public DaemonClientConnection connect(ClientOutput output) {

// No compatible daemons available - start a new daemon
String message = handleStopEvents(idleDaemons, busyDaemons);
output.buildStatus(message);
output.accept(Message.buildStatus(message));
return startDaemon();
}

Expand Down
Expand Up @@ -511,15 +511,15 @@ String get() {
}
}
final String result = valueSource.valueSupplier.get();
if (result != null && LOG.isDebugEnabled()) {
if (result != null && LOG.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("Loaded environment value for key [")
.append(envKey.name())
.append("] from ");
valueSource.descriptionFunction.apply(sb);
sb.append(": [")
.append(result)
.append(']');
LOG.debug(sb.toString());
LOG.trace(sb.toString());
}
return result;
}
Expand Down
75 changes: 24 additions & 51 deletions client/src/main/java/org/jboss/fuse/mvnd/client/DefaultClient.java
Expand Up @@ -28,15 +28,10 @@
import org.jboss.fuse.mvnd.common.DaemonRegistry;
import org.jboss.fuse.mvnd.common.Environment;
import org.jboss.fuse.mvnd.common.Message;
import org.jboss.fuse.mvnd.common.Message.BuildEvent;
import org.jboss.fuse.mvnd.common.Message.BuildException;
import org.jboss.fuse.mvnd.common.Message.BuildMessage;
import org.jboss.fuse.mvnd.common.Message.BuildStarted;
import org.jboss.fuse.mvnd.common.OsUtils;
import org.jboss.fuse.mvnd.common.logging.ClientOutput;
import org.jboss.fuse.mvnd.common.logging.TerminalOutput;
import org.jline.terminal.Terminal;
import org.jline.terminal.impl.AbstractPosixTerminal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -127,15 +122,9 @@ public ExecutionResult execute(ClientOutput output, List<String> argv) {
+ "-" + buildProperties.getOsArch()
+ nativeSuffix)
.reset().toString();
output.accept(null, v);
output.accept(Message.log(v));
// Print terminal information
Terminal terminal = output.getTerminal();
StringBuilder sb = new StringBuilder();
sb.append("Terminal: ").append(terminal != null ? terminal.getClass().getName() : null);
if (terminal instanceof AbstractPosixTerminal) {
sb.append(" with pty ").append(((AbstractPosixTerminal) terminal).getPty().getClass().getName());
}
output.accept(null, sb.toString());
output.describeTerminal();
/*
* Do not return, rather pass -v to the server so that the client module does not need to depend on any
* Maven artifacts
Expand All @@ -146,20 +135,20 @@ public ExecutionResult execute(ClientOutput output, List<String> argv) {
boolean status = args.remove("--status");
if (status) {
final String template = " %36s %7s %5s %7s %5s %23s %s";
output.accept(null, String.format(template,
"UUID", "PID", "Port", "Status", "RSS", "Last activity", "Java home"));
output.accept(Message.log(String.format(template,
"UUID", "PID", "Port", "Status", "RSS", "Last activity", "Java home")));
for (DaemonInfo d : registry.getAll()) {
if (ProcessHandle.of(d.getPid()).isEmpty()) {
/* The process does not exist anymore - remove it from the registry */
registry.remove(d.getUid());
} else {
output.accept(null, String.format(template,
output.accept(Message.log(String.format(template,
d.getUid(), d.getPid(), d.getAddress(), d.getState(),
OsUtils.kbTohumanReadable(OsUtils.findProcessRssInKb(d.getPid())),
LocalDateTime.ofInstant(
Instant.ofEpochMilli(Math.max(d.getLastIdle(), d.getLastBusy())),
ZoneId.systemDefault()),
d.getJavaHome()));
d.getJavaHome())));
}
}
return new DefaultResult(argv, null);
Expand All @@ -168,7 +157,7 @@ public ExecutionResult execute(ClientOutput output, List<String> argv) {
if (stop) {
DaemonInfo[] dis = registry.getAll().toArray(new DaemonInfo[0]);
if (dis.length > 0) {
output.accept(null, "Stopping " + dis.length + " running daemons");
output.accept(Message.log("Stopping " + dis.length + " running daemons"));
for (DaemonInfo di : dis) {
try {
ProcessHandle.of(di.getPid()).ifPresent(ProcessHandle::destroyForcibly);
Expand Down Expand Up @@ -202,52 +191,36 @@ public ExecutionResult execute(ClientOutput output, List<String> argv) {

final DaemonConnector connector = new DaemonConnector(parameters, registry);
try (DaemonClientConnection daemon = connector.connect(output)) {
output.buildStatus("Connected to daemon");
output.accept(Message.buildStatus("Connected to daemon"));

daemon.dispatch(new Message.BuildRequest(
args,
parameters.userDir().toString(),
parameters.multiModuleProjectDirectory().toString(),
System.getenv()));

output.buildStatus("Build request sent");
output.accept(Message.buildStatus("Build request sent"));

while (true) {
Message m = daemon.receive();
if (m instanceof BuildException) {
final BuildException e = (BuildException) m;
output.error(e.getMessage(), e.getClassName(), e.getStackTrace());
return new DefaultResult(argv,
new Exception(e.getClassName() + ": " + e.getMessage() + "\n" + e.getStackTrace()));
} else if (m instanceof BuildStarted) {
final BuildStarted bs = (BuildStarted) m;
output.startBuild(bs.getProjectId(), bs.getProjectCount(), bs.getMaxThreads());
} else if (m instanceof BuildEvent) {
BuildEvent be = (BuildEvent) m;
switch (be.getType()) {
case BuildStopped:
final List<Message> messages = daemon.receive();
for (int i = 0; i < messages.size(); i++) {
Message m = messages.get(i);
switch (m.getType()) {
case Message.BUILD_EXCEPTION: {
output.accept(messages.subList(0, i + 1));
final BuildException e = (BuildException) m;
return new DefaultResult(argv,
new Exception(e.getClassName() + ": " + e.getMessage() + "\n" + e.getStackTrace()));
}
case Message.BUILD_STOPPED: {
output.accept(messages.subList(0, i));
return new DefaultResult(argv, null);
case ProjectStarted:
case MojoStarted:
output.projectStateChanged(be.getProjectId(), be.getDisplay());
break;
case ProjectStopped:
output.projectFinished(be.getProjectId());
}
default:
break;
}
} else if (m instanceof BuildMessage) {
BuildMessage bm = (BuildMessage) m;
output.accept(bm.getProjectId(), bm.getMessage());
} else if (m == Message.KEEP_ALIVE_SINGLETON) {
output.keepAlive();
} else if (m instanceof Message.Display) {
Message.Display d = (Message.Display) m;
output.display(d.getProjectId(), d.getMessage());
} else if (m instanceof Message.Prompt) {
Message.Prompt p = (Message.Prompt) m;
String response = output.prompt(p.getProjectId(), p.getMessage(), p.isPassword());
daemon.dispatch(new Message.PromptResponse(p.getProjectId(), p.getUid(), response));
}
output.accept(messages);
}
}
}
Expand Down