Skip to content

Commit

Permalink
Implement build cancellation, fixes apache#127
Browse files Browse the repository at this point in the history
  • Loading branch information
gnodet committed Nov 9, 2020
1 parent e37d245 commit 9478715
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 29 deletions.
Expand Up @@ -135,6 +135,15 @@ public void close() {
connection.close();
}

public void cancel() {
dispatch(Message.CANCEL_SINGLETON);
try {
queue.put(Message.CANCEL_SINGLETON);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

public interface StaleAddressDetector {
/**
* @return true if the failure should be considered due to a stale address.
Expand Down
Expand Up @@ -63,26 +63,34 @@ public class DaemonConnector {

private final DaemonRegistry registry;
private final DaemonParameters parameters;
private boolean canceled;
private Thread thread;

public DaemonConnector(DaemonParameters parameters, DaemonRegistry registry) {
this.parameters = parameters;
this.registry = registry;
}

public DaemonClientConnection maybeConnect(DaemonCompatibilitySpec constraint) {
return findConnection(getCompatibleDaemons(registry.getAll(), constraint));
public void cancel() {
canceled = true;
Thread thread = this.thread;
if (thread != null) {
thread.interrupt();
}
}

public DaemonClientConnection maybeConnect(DaemonInfo daemon) {
public DaemonClientConnection connect(ClientOutput output) {
canceled = false;
thread = Thread.currentThread();
try {
return connectToDaemon(daemon, new CleanupOnStaleAddress(daemon), false);
} catch (DaemonException.ConnectException e) {
LOGGER.debug("Cannot connect to daemon {} due to {}. Ignoring.", daemon, e);
return doConnect(output);
} finally {
thread = null;
canceled = false;
}
return null;
}

public DaemonClientConnection connect(ClientOutput output) {
protected DaemonClientConnection doConnect(ClientOutput output) {
final DaemonCompatibilitySpec constraint = new DaemonCompatibilitySpec(
parameters.javaHome(), parameters.getDaemonOpts());
output.buildStatus("Looking up daemon...");
Expand Down
21 changes: 17 additions & 4 deletions client/src/main/java/org/jboss/fuse/mvnd/client/DefaultClient.java
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import org.fusesource.jansi.Ansi;
import org.jboss.fuse.mvnd.common.BuildProperties;
import org.jboss.fuse.mvnd.common.DaemonException;
import org.jboss.fuse.mvnd.common.DaemonInfo;
import org.jboss.fuse.mvnd.common.DaemonRegistry;
import org.jboss.fuse.mvnd.common.Environment;
Expand All @@ -37,13 +38,13 @@
import org.jboss.fuse.mvnd.common.logging.TerminalOutput;
import org.jline.terminal.Terminal;
import org.jline.terminal.impl.AbstractPosixTerminal;
import org.jline.utils.AttributedString;
import org.jline.utils.AttributedStyle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultClient implements Client {

public static final int CANCEL_TIMEOUT = 10 * 1000;

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultClient.class);

private final DaemonParameters parameters;
Expand All @@ -67,7 +68,12 @@ public static void main(String[] argv) throws Exception {
}

try (TerminalOutput output = new TerminalOutput(logFile)) {
new DefaultClient(new DaemonParameters()).execute(output, args);
try {
new DefaultClient(new DaemonParameters()).execute(output, args);
} catch (DaemonException.InterruptedException e) {
final AttributedStyle s = new AttributedStyle().bold().foreground(AttributedStyle.RED);
new AttributedString(System.lineSeparator() + "Canceled !!!", s).println(output.getTerminal());
}
}
}

Expand Down Expand Up @@ -201,6 +207,8 @@ public ExecutionResult execute(ClientOutput output, List<String> argv) {
}

final DaemonConnector connector = new DaemonConnector(parameters, registry);
output.onInterrupt(connector::cancel);

try (DaemonClientConnection daemon = connector.connect(output)) {
output.buildStatus("Connected to daemon");

Expand All @@ -212,9 +220,14 @@ public ExecutionResult execute(ClientOutput output, List<String> argv) {

output.buildStatus("Build request sent");

output.onInterrupt(daemon::cancel);

while (true) {
Message m = daemon.receive();
if (m instanceof BuildException) {
if (m == Message.CANCEL_SINGLETON) {
output.cancel();
return null;
} else if (m instanceof BuildException) {
final BuildException e = (BuildException) m;
output.error(e.getMessage(), e.getClassName(), e.getStackTrace());
return new DefaultResult(argv,
Expand Down
4 changes: 4 additions & 0 deletions common/src/main/java/org/jboss/fuse/mvnd/common/Message.java
Expand Up @@ -38,9 +38,11 @@ public abstract class Message {
static final int DISPLAY = 7;
static final int PROMPT = 8;
static final int PROMPT_RESPONSE = 9;
static final int CANCEL = 10;

public static final SimpleMessage KEEP_ALIVE_SINGLETON = new SimpleMessage(Message.KEEP_ALIVE, "KEEP_ALIVE");
public static final SimpleMessage STOP_SINGLETON = new SimpleMessage(Message.STOP, "STOP");
public static final SimpleMessage CANCEL_SINGLETON = new SimpleMessage(Message.CANCEL, "CANCEL");

public static Message read(DataInputStream input) throws IOException {
int type = input.read();
Expand Down Expand Up @@ -68,6 +70,8 @@ public static Message read(DataInputStream input) throws IOException {
return Prompt.read(input);
case PROMPT_RESPONSE:
return PromptResponse.read(input);
case CANCEL:
return SimpleMessage.CANCEL_SINGLETON;
}
throw new IllegalStateException("Unexpected message type: " + type);
}
Expand Down
Expand Up @@ -41,4 +41,8 @@ public interface ClientOutput extends AutoCloseable {
void display(String projectId, String message);

String prompt(String projectId, String message, boolean password);

void onInterrupt(Runnable runnable);

void cancel();
}
Expand Up @@ -56,6 +56,7 @@ public class TerminalOutput implements ClientOutput {

private final BlockingQueue<Event> queue;
private final Terminal terminal;
private final Terminal.SignalHandler previousIntHandler;
private final Display display;
private final LinkedHashMap<String, Project> projects = new LinkedHashMap<>();
private final ClientLog log;
Expand All @@ -71,6 +72,7 @@ public class TerminalOutput implements ClientOutput {
private volatile int totalProjects;
private volatile int maxThreads;

private Runnable interruptHandler;
private int linesPerProject = 0; // read/written only by the displayLoop
private int doneProjects = 0; // read/written only by the displayLoop
private String buildStatus; // read/written only by the displayLoop
Expand All @@ -87,11 +89,13 @@ enum EventType {
KEEP_ALIVE,
DISPLAY,
PROMPT,
PROMPT_PASSWORD
PROMPT_PASSWORD,
CANCEL
}

static class Event {
public static final Event KEEP_ALIVE = new Event(EventType.KEEP_ALIVE, null, null);
public static final Event CANCEL = new Event(EventType.CANCEL, null, "Canceled !!!");
public final EventType type;
public final String projectId;
public final String message;
Expand Down Expand Up @@ -128,6 +132,7 @@ public TerminalOutput(Path logFile) throws IOException {
this.queue = new LinkedBlockingDeque<>();
this.terminal = TerminalBuilder.terminal();
terminal.enterRawMode();
this.previousIntHandler = terminal.handle(Terminal.Signal.INT, sig -> handleInterrupt());
this.display = new Display(terminal, false);
this.log = logFile == null ? new MessageCollector() : new FileLog(logFile);
final Thread w = new Thread(this::displayLoop);
Expand All @@ -138,6 +143,23 @@ public TerminalOutput(Path logFile) throws IOException {
this.reader = r;
}

protected void handleInterrupt() {
this.interruptHandler.run();
}

public void onInterrupt(Runnable run) {
this.interruptHandler = run;
}

@Override
public void cancel() {
try {
queue.put(Event.CANCEL);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

public Terminal getTerminal() {
return terminal;
}
Expand Down Expand Up @@ -290,6 +312,7 @@ void displayLoop() {
}
break;
}
case CANCEL:
case ERROR: {
projects.values().stream().flatMap(p -> p.log.stream()).forEach(log);
clearDisplay();
Expand Down Expand Up @@ -395,6 +418,7 @@ public void close() throws Exception {
queue.put(new Event(EventType.END_OF_STREAM, null, null));
worker.join();
reader.join();
terminal.handle(Terminal.Signal.INT, previousIntHandler);
terminal.close();
closed.countDown();
if (exception != null) {
Expand Down
Expand Up @@ -88,6 +88,10 @@ public void shutdown() {
executor.shutdown();
}

public void cancel() {
executor.shutdownNow();
}

// hook to allow pausing executor during unit tests
protected void beforeExecute(Thread t, Runnable r) {
}
Expand Down
48 changes: 43 additions & 5 deletions daemon/src/main/java/org/jboss/fuse/mvnd/builder/SmartBuilder.java
Expand Up @@ -70,16 +70,47 @@ public class SmartBuilder implements Builder {

private final LifecycleModuleBuilder moduleBuilder;

private volatile SmartBuilderImpl builder;
private volatile boolean canceled;

private static SmartBuilder INSTANCE;

public static void cancel() {
SmartBuilder builder = INSTANCE;
if (builder != null) {
builder.doCancel();
}
}

public static void doneCancel() {
SmartBuilder builder = INSTANCE;
if (builder != null) {
builder.doDoneCancel();
}
}

@Inject
public SmartBuilder(LifecycleModuleBuilder moduleBuilder) {
this.moduleBuilder = moduleBuilder;
INSTANCE = this;
}

void doCancel() {
canceled = true;
SmartBuilderImpl b = builder;
if (b != null) {
b.cancel();
}
}

void doDoneCancel() {
canceled = false;
}

@Override
public void build(final MavenSession session, final ReactorContext reactorContext,
public synchronized void build(final MavenSession session, final ReactorContext reactorContext,
ProjectBuildList projectBuilds, final List<TaskSegment> taskSegments,
ReactorBuildStatus reactorBuildStatus) throws ExecutionException, InterruptedException {

List<String> list = new ArrayList<>();

String providerScript = null;
Expand Down Expand Up @@ -165,9 +196,16 @@ public void build(final MavenSession session, final ReactorContext reactorContex
List<Map.Entry<TaskSegment, ReactorBuildStats>> allstats = new ArrayList<>();
for (TaskSegment taskSegment : taskSegments) {
Set<MavenProject> projects = projectBuilds.getByTaskSegment(taskSegment).getProjects();
ReactorBuildStats stats = new SmartBuilderImpl(moduleBuilder, session, reactorContext, taskSegment, projects, graph)
.build();
allstats.add(new AbstractMap.SimpleEntry<>(taskSegment, stats));
if (canceled) {
return;
}
builder = new SmartBuilderImpl(moduleBuilder, session, reactorContext, taskSegment, projects, graph);
try {
ReactorBuildStats stats = builder.build();
allstats.add(new AbstractMap.SimpleEntry<>(taskSegment, stats));
} finally {
builder = null;
}
}

if (session.getResult().hasExceptions()) {
Expand Down
Expand Up @@ -157,6 +157,10 @@ private void shutdown() {
executor.shutdown();
}

public void cancel() {
executor.cancel();
}

private void submitAll(Set<MavenProject> readyProjects) {
List<ProjectBuildTask> tasks = new ArrayList<>();
for (MavenProject project : readyProjects) {
Expand Down

0 comments on commit 9478715

Please sign in to comment.