Skip to content

Commit

Permalink
Implement a keep-alive strategy to detect stales connections, fixes #47
Browse files Browse the repository at this point in the history
  • Loading branch information
gnodet committed Oct 22, 2020
1 parent 7dbb371 commit 00db8fd
Show file tree
Hide file tree
Showing 20 changed files with 599 additions and 86 deletions.
41 changes: 38 additions & 3 deletions client/src/main/java/org/jboss/fuse/mvnd/client/ClientLayout.java
Expand Up @@ -38,6 +38,9 @@ public class ClientLayout extends Layout {
private final Path settings;
private final Path javaHome;
private final Path logbackConfigurationPath;
private final int idleTimeoutMs;
private final int keepAliveMs;
private final int maxLostKeepAlive;

public static ClientLayout getEnvInstance() {
if (ENV_INSTANCE == null) {
Expand Down Expand Up @@ -68,6 +71,21 @@ public static ClientLayout getEnvInstance() {
.orFail()
.asPath()
.toAbsolutePath().normalize();
final int idleTimeoutMs = Environment.DAEMON_IDLE_TIMEOUT_MS
.systemProperty()
.orLocalProperty(mvndProperties, mvndPropertiesPath)
.orDefault(() -> Integer.toString(Environment.DEFAULT_IDLE_TIMEOUT))
.asInt();
final int keepAliveMs = Environment.DAEMON_KEEP_ALIVE_MS
.systemProperty()
.orLocalProperty(mvndProperties, mvndPropertiesPath)
.orDefault(() -> Integer.toString(Environment.DEFAULT_KEEP_ALIVE))
.asInt();
final int maxLostKeepAlive = Environment.DAEMON_MAX_LOST_KEEP_ALIVE
.systemProperty()
.orLocalProperty(mvndProperties, mvndPropertiesPath)
.orDefault(() -> Integer.toString(Environment.DEFAULT_MAX_LOST_KEEP_ALIVE))
.asInt();
ENV_INSTANCE = new ClientLayout(
mvndPropertiesPath,
mvndHome,
Expand All @@ -76,18 +94,23 @@ public static ClientLayout getEnvInstance() {
Environment.findJavaHome(mvndProperties, mvndPropertiesPath),
findLocalRepo(),
null,
Environment.findLogbackConfigurationPath(mvndProperties, mvndPropertiesPath, mvndHome));
Environment.findLogbackConfigurationPath(mvndProperties, mvndPropertiesPath, mvndHome),
idleTimeoutMs, keepAliveMs, maxLostKeepAlive);
}
return ENV_INSTANCE;
}

public ClientLayout(Path mvndPropertiesPath, Path mavenHome, Path userDir, Path multiModuleProjectDirectory, Path javaHome,
Path localMavenRepository, Path settings, Path logbackConfigurationPath) {
Path localMavenRepository, Path settings, Path logbackConfigurationPath, int idleTimeoutMs, int keepAliveMs,
int maxLostKeepAlive) {
super(mvndPropertiesPath, mavenHome, userDir, multiModuleProjectDirectory);
this.localMavenRepository = localMavenRepository;
this.settings = settings;
this.javaHome = Objects.requireNonNull(javaHome, "javaHome");
this.logbackConfigurationPath = logbackConfigurationPath;
this.idleTimeoutMs = idleTimeoutMs;
this.keepAliveMs = keepAliveMs;
this.maxLostKeepAlive = maxLostKeepAlive;
}

/**
Expand All @@ -96,7 +119,7 @@ public ClientLayout(Path mvndPropertiesPath, Path mavenHome, Path userDir, Path
*/
public ClientLayout cd(Path newUserDir) {
return new ClientLayout(mvndPropertiesPath, mavenHome, newUserDir, multiModuleProjectDirectory, javaHome,
localMavenRepository, settings, logbackConfigurationPath);
localMavenRepository, settings, logbackConfigurationPath, idleTimeoutMs, keepAliveMs, maxLostKeepAlive);
}

/**
Expand All @@ -122,6 +145,18 @@ public Path getLogbackConfigurationPath() {
return logbackConfigurationPath;
}

public int getIdleTimeoutMs() {
return idleTimeoutMs;
}

public int getKeepAliveMs() {
return keepAliveMs;
}

public int getMaxLostKeepAlive() {
return maxLostKeepAlive;
}

static Path findLocalRepo() {
return Environment.MAVEN_REPO_LOCAL.systemProperty().asPath();
}
Expand Down
Expand Up @@ -15,6 +15,12 @@
*/
package org.jboss.fuse.mvnd.client;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jboss.fuse.mvnd.common.DaemonConnection;
Expand All @@ -31,21 +37,30 @@
* File origin:
* https://github.com/gradle/gradle/blob/v5.6.2/subprojects/launcher/src/main/java/org/gradle/launcher/daemon/client/DaemonClientConnection.java
*/
public class DaemonClientConnection {
public class DaemonClientConnection implements Closeable {

private final static Logger LOG = LoggerFactory.getLogger(DaemonClientConnection.class);

private final DaemonConnection<Message> connection;
private final DaemonInfo daemon;
private final StaleAddressDetector staleAddressDetector;
private final boolean newDaemon;
private boolean hasReceived;
private final Lock dispatchLock = new ReentrantLock();
private final int maxKeepAliveMs;
private final BlockingQueue<Object> queue = new ArrayBlockingQueue<>(16);
private final Thread receiver;
private final AtomicBoolean running = new AtomicBoolean(true);

public DaemonClientConnection(DaemonConnection<Message> connection, DaemonInfo daemon,
StaleAddressDetector staleAddressDetector) {
StaleAddressDetector staleAddressDetector, boolean newDaemon, int maxKeepAliveMs) {
this.connection = connection;
this.daemon = daemon;
this.staleAddressDetector = staleAddressDetector;
this.newDaemon = newDaemon;
this.maxKeepAliveMs = maxKeepAliveMs;
this.receiver = new Thread(this::doReceive);
this.receiver.start();
}

public DaemonInfo getDaemon() {
Expand All @@ -71,22 +86,56 @@ public void dispatch(Message message) throws DaemonException.ConnectException {
}
}

public Message receive() throws DaemonException.ConnectException {
public Message receive() throws ConnectException, StaleAddressException {
while (true) {
try {
Object o = queue.poll(maxKeepAliveMs, TimeUnit.MILLISECONDS);
if (o instanceof Exception) {
throw (Exception) o;
} else {
return (Message) o;
}
} catch (Exception e) {
LOG.debug("Problem receiving message to the daemon. Performing 'on failure' operation...");
if (!hasReceived && newDaemon) {
throw new ConnectException("Could not receive a message from the daemon.", e);
} else if (staleAddressDetector.maybeStaleAddress(e)) {
throw new StaleAddressException("Could not receive a message from the daemon.", e);
}
} finally {
hasReceived = true;
}
}
}

protected void doReceive() {
try {
return connection.receive();
} catch (DaemonException.MessageIOException e) {
LOG.debug("Problem receiving message to the daemon. Performing 'on failure' operation...");
if (!hasReceived && staleAddressDetector.maybeStaleAddress(e)) {
throw new DaemonException.StaleAddressException("Could not receive a message from the daemon.", e);
while (running.get()) {
Message m;
try {
m = connection.receive();
} catch (MessageIOException e) {
queue.put(e);
return;
}
if (m != null) {
queue.put(m);
} else {
queue.put(new IOException("No message received within " + maxKeepAliveMs + "ms, daemon may have crashed"));
return;
}
}
} catch (InterruptedException e) {
if (running.get()) {
throw new RuntimeException(e);
}
throw new DaemonException.ConnectException("Could not receive a message from the daemon.", e);
} finally {
hasReceived = true;
}
}

public void stop() {
public void close() {
LOG.debug("thread {}: connection stop", Thread.currentThread().getId());
running.set(false);
receiver.interrupt();
connection.close();
}

Expand Down
Expand Up @@ -90,7 +90,7 @@ public DaemonClientConnection maybeConnect(DaemonCompatibilitySpec constraint) {

public DaemonClientConnection maybeConnect(DaemonInfo daemon) {
try {
return connectToDaemon(daemon, new CleanupOnStaleAddress(daemon, true));
return connectToDaemon(daemon, new CleanupOnStaleAddress(daemon), false);
} catch (DaemonException.ConnectException e) {
LOGGER.debug("Cannot connect to daemon {} due to {}. Ignoring.", daemon, e);
}
Expand Down Expand Up @@ -225,7 +225,7 @@ private List<DaemonInfo> getCompatibleDaemons(Iterable<DaemonInfo> daemons, Daem
private DaemonClientConnection findConnection(List<DaemonInfo> compatibleDaemons) {
for (DaemonInfo daemon : compatibleDaemons) {
try {
return connectToDaemon(daemon, new CleanupOnStaleAddress(daemon, true));
return connectToDaemon(daemon, new CleanupOnStaleAddress(daemon), false);
} catch (DaemonException.ConnectException e) {
LOGGER.debug("Cannot connect to daemon {} due to {}. Trying a different daemon...", daemon, e);
}
Expand All @@ -238,7 +238,7 @@ public DaemonClientConnection startDaemon(DaemonCompatibilitySpec constraint) {
LOGGER.debug("Started Maven daemon {}", daemon);
long start = System.currentTimeMillis();
do {
DaemonClientConnection daemonConnection = connectToDaemonWithId(daemon);
DaemonClientConnection daemonConnection = connectToDaemonWithId(daemon, true);
if (daemonConnection != null) {
return daemonConnection;
}
Expand Down Expand Up @@ -277,11 +277,8 @@ private String startDaemon() {
args.add("-Dlogback.configurationFile=" + layout.getLogbackConfigurationPath());
args.add("-Ddaemon.uid=" + uid);
args.add("-Xmx4g");
final String timeout = Environment.DAEMON_IDLE_TIMEOUT.systemProperty().asString();
if (timeout != null) {
args.add(Environment.DAEMON_IDLE_TIMEOUT.asCommandLineProperty(timeout));
}

args.add(Environment.DAEMON_IDLE_TIMEOUT_MS.asCommandLineProperty(Integer.toString(layout.getIdleTimeoutMs())));
args.add(Environment.DAEMON_KEEP_ALIVE_MS.asCommandLineProperty(Integer.toString(layout.getKeepAliveMs())));
args.add(MavenDaemon.class.getName());
command = String.join(" ", args);

Expand Down Expand Up @@ -314,13 +311,14 @@ private String findJars(Path mavenHome, Predicate<Path>... filters) {
}
}

private DaemonClientConnection connectToDaemonWithId(String daemon) throws DaemonException.ConnectException {
private DaemonClientConnection connectToDaemonWithId(String daemon, boolean newDaemon)
throws DaemonException.ConnectException {
// Look for 'our' daemon among the busy daemons - a daemon will start in busy state so that nobody else will
// grab it.
DaemonInfo daemonInfo = registry.get(daemon);
if (daemonInfo != null) {
try {
return connectToDaemon(daemonInfo, new CleanupOnStaleAddress(daemonInfo, false));
return connectToDaemon(daemonInfo, new CleanupOnStaleAddress(daemonInfo), newDaemon);
} catch (DaemonException.ConnectException e) {
DaemonDiagnostics diag = new DaemonDiagnostics(daemon, layout.daemonLog(daemon));
throw new DaemonException.ConnectException("Could not connect to the Maven daemon.\n" + diag.describe(), e);
Expand All @@ -330,11 +328,13 @@ private DaemonClientConnection connectToDaemonWithId(String daemon) throws Daemo
}

private DaemonClientConnection connectToDaemon(DaemonInfo daemon,
DaemonClientConnection.StaleAddressDetector staleAddressDetector) throws DaemonException.ConnectException {
DaemonClientConnection.StaleAddressDetector staleAddressDetector, boolean newDaemon)
throws DaemonException.ConnectException {
LOGGER.debug("Connecting to Daemon");
try {
int maxKeepAliveMs = layout.getKeepAliveMs() * layout.getMaxLostKeepAlive();
DaemonConnection<Message> connection = connect(daemon.getAddress());
return new DaemonClientConnection(connection, daemon, staleAddressDetector);
return new DaemonClientConnection(connection, daemon, staleAddressDetector, newDaemon, maxKeepAliveMs);
} catch (DaemonException.ConnectException e) {
staleAddressDetector.maybeStaleAddress(e);
throw e;
Expand All @@ -345,11 +345,9 @@ private DaemonClientConnection connectToDaemon(DaemonInfo daemon,

private class CleanupOnStaleAddress implements DaemonClientConnection.StaleAddressDetector {
private final DaemonInfo daemon;
private final boolean exposeAsStale;

public CleanupOnStaleAddress(DaemonInfo daemon, boolean exposeAsStale) {
public CleanupOnStaleAddress(DaemonInfo daemon) {
this.daemon = daemon;
this.exposeAsStale = exposeAsStale;
}

@Override
Expand All @@ -360,7 +358,7 @@ public boolean maybeStaleAddress(Exception failure) {
"by user or operating system");
registry.storeStopEvent(stopEvent);
registry.remove(daemon.getUid());
return exposeAsStale;
return true;
}
}

Expand Down

0 comments on commit 00db8fd

Please sign in to comment.