Skip to content

Commit

Permalink
Fix #1972
Browse files Browse the repository at this point in the history
Add:
* beforeStoppingVertx - called during the close method of vert.x (CloseHook)
* afterStoppingVertx - called once vert.x has been closed

Signed-off-by: Clement Escoffier <clement.escoffier@gmail.com>
  • Loading branch information
cescoffier committed May 8, 2017
1 parent 16679f1 commit ae18df3
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 22 deletions.
10 changes: 10 additions & 0 deletions src/main/java/io/vertx/core/Launcher.java
Expand Up @@ -94,6 +94,16 @@ public void beforeDeployingVerticle(DeploymentOptions deploymentOptions) {

}

@Override
public void beforeStoppingVertx(Vertx vertx) {

}

@Override
public void afterStoppingVertx() {

}

/**
* A deployment failure has been encountered. You can override this method to customize the behavior.
* By default it closes the `vertx` instance.
Expand Down
25 changes: 21 additions & 4 deletions src/main/java/io/vertx/core/impl/launcher/VertxLifecycleHooks.java
Expand Up @@ -28,33 +28,50 @@
public interface VertxLifecycleHooks {

/**
* Hook for sub-classes of the starter class before the vertx instance is started. Options can still be updated.
* Hook for sub-classes of the {@link io.vertx.core.Launcher} class before the vertx instance is started. Options
* can still be updated.
*
* @param config the json config file passed via -conf on the command line, an empty json object is not set.
*/
void afterConfigParsed(JsonObject config);

/**
* Hook for sub-classes of the starter class before the vertx instance is started. Options can still be updated.
* Hook for sub-classes of the {@link io.vertx.core.Launcher} class before the vertx instance is started. Options
* can still be updated.
*
* @param options the vert.x options
*/
void beforeStartingVertx(VertxOptions options);

/**
* Hook for sub-classes of the starter class after the vertx instance is started.
* Hook for sub-classes of the {@link io.vertx.core.Launcher} class after the vertx instance is started.
*
* @param vertx the vert.x instance
*/
void afterStartingVertx(Vertx vertx);

/**
* Hook for sub classes of the starter class before the verticle is deployed. Deployment options can still be updated.
* Hook for sub classes of the {@link io.vertx.core.Launcher} class before the verticle is deployed. Deployment
* options can still be updated.
*
* @param deploymentOptions the deployment options
*/
void beforeDeployingVerticle(DeploymentOptions deploymentOptions);

/**
* Hook for sub classes of the {@link io.vertx.core.Launcher} class called before the {@link Vertx} instance is
* terminated. The hook is called during the {@link Vertx#close()} method.
*
* @param vertx the {@link Vertx} instance, cannot be {@code null}
*/
void beforeStoppingVertx(Vertx vertx);

/**
* Hook for sub classes of the {@link io.vertx.core.Launcher} class called after the {@link Vertx} instance has been
* terminated. The hook is called after the {@link Vertx#close()} method.
*/
void afterStoppingVertx();

/**
* A deployment failure has been encountered. You can override this method to customize the behavior.
*
Expand Down
57 changes: 45 additions & 12 deletions src/main/java/io/vertx/core/impl/launcher/commands/BareCommand.java
Expand Up @@ -18,6 +18,7 @@
import io.vertx.core.*;
import io.vertx.core.cli.annotations.*;
import io.vertx.core.impl.launcher.VertxLifecycleHooks;
import io.vertx.core.logging.Logger;
import io.vertx.core.metrics.MetricsOptions;
import io.vertx.core.spi.VertxMetricsFactory;
import io.vertx.core.spi.launcher.ExecutionContext;
Expand All @@ -40,7 +41,7 @@
*/
@Summary("Creates a bare instance of vert.x.")
@Description("This command launches a vert.x instance but do not deploy any verticles. It will " +
"receive a verticle if another node of the cluster dies.")
"receive a verticle if another node of the cluster dies.")
@Name("bare")
public class BareCommand extends ClasspathHandler {

Expand All @@ -58,14 +59,16 @@ public class BareCommand extends ClasspathHandler {

protected VertxOptions options;

protected Runnable finalAction;

/**
* Sets the quorum option.
*
* @param quorum the quorum, default to 1.
*/
@Option(longName = "quorum", argName = "q")
@Description("Used in conjunction with -ha this specifies the minimum number of nodes in the cluster for any HA " +
"deploymentIDs to be active. Defaults to 1.")
"deploymentIDs to be active. Defaults to 1.")
@DefaultValue("-1")
public void setQuorum(int quorum) {
this.quorum = quorum;
Expand All @@ -78,7 +81,7 @@ public void setQuorum(int quorum) {
*/
@Option(longName = "hagroup", argName = "group")
@Description("used in conjunction with -ha this specifies the HA group this node will join. There can be multiple " +
"HA groups in a cluster. Nodes will only failover to other nodes in the same group. Defaults to '__DEFAULT__'.")
"HA groups in a cluster. Nodes will only failover to other nodes in the same group. Defaults to '__DEFAULT__'.")
@DefaultValue("__DEFAULT__")
public void setHAGroup(String group) {
this.haGroup = group;
Expand All @@ -103,7 +106,7 @@ public void setClusterPort(int port) {
*/
@Option(longName = "cluster-host", argName = "host")
@Description("host to bind to for cluster communication. If this is not specified vert.x will attempt to choose one" +
" from the available interfaces.")
" from the available interfaces.")
public void setClusterHost(String host) {
this.clusterHost = host;
}
Expand All @@ -129,6 +132,16 @@ public boolean getHA() {
*/
@Override
public void run() {
this.run(null);
}

/**
* Starts the vert.x instance and sets the final action (called when vert.x is closed).
*
* @param action the action, can be {@code null}
*/
public void run(Runnable action) {
this.finalAction = action;
vertx = startVertx();
}

Expand Down Expand Up @@ -192,7 +205,7 @@ protected Vertx startVertx() {
} else {
instance = create(options);
}
addShutdownHook();
addShutdownHook(instance, log, finalAction);
afterStartingVertx(instance);
return instance;
}
Expand Down Expand Up @@ -298,12 +311,29 @@ private Method getSetter(String fieldName, Class<?> clazz) {
}

/**
* Registers a shutdown hook closing the vert.x instance when the JVM is terminating.
* Registers a shutdown hook closing the given vert.x instance when the JVM is terminating.
* Optionally, an action can be executed after the termination of the {@link Vertx} instance.
*
* @param vertx the vert.x instance, must not be {@code null}
* @param log the log, must not be {@code null}
* @param action the action, may be {@code null}
*/
protected static void addShutdownHook(Vertx vertx, Logger log, Runnable action) {
Runtime.getRuntime().addShutdownHook(new Thread(getTerminationRunnable(vertx, log, action)));
}

/**
* Gets the termination runnable used to close the Vert.x instance.
*
* @param vertx the vert.x instance, must not be {@code null}
* @param log the log, must not be {@code null}
* @param action the action, may be {@code null}
*/
protected void addShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
CountDownLatch latch = new CountDownLatch(1);
public static Runnable getTerminationRunnable(Vertx vertx, Logger log, Runnable action) {
return () -> {
CountDownLatch latch = new CountDownLatch(1);
System.out.println("Shutdown hook ! " + vertx);
if (vertx != null) {
vertx.close(ar -> {
if (!ar.succeeded()) {
log.error("Failure in stopping Vert.x", ar.cause());
Expand All @@ -314,11 +344,14 @@ public void run() {
if (!latch.await(2, TimeUnit.MINUTES)) {
log.error("Timed out waiting to undeploy all");
}
if (action != null) {
action.run();
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
});
};
}

/**
Expand All @@ -340,7 +373,7 @@ protected String getDefaultAddress() {
while (addresses.hasMoreElements()) {
InetAddress address = addresses.nextElement();
if (!address.isAnyLocalAddress() && !address.isMulticastAddress()
&& !(address instanceof Inet6Address)) {
&& !(address instanceof Inet6Address)) {
return address.getHostAddress();
}
}
Expand Down
32 changes: 29 additions & 3 deletions src/main/java/io/vertx/core/impl/launcher/commands/RunCommand.java
Expand Up @@ -15,11 +15,11 @@
*/
package io.vertx.core.impl.launcher.commands;

import io.vertx.core.DeploymentOptions;
import io.vertx.core.Handler;
import io.vertx.core.*;
import io.vertx.core.cli.CLIException;
import io.vertx.core.cli.CommandLine;
import io.vertx.core.cli.annotations.*;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.launcher.VertxLifecycleHooks;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.JsonObject;
Expand Down Expand Up @@ -250,11 +250,23 @@ public void run() {
}
afterConfigParsed(conf);

super.run();
super.run(this::afterStoppingVertx);
if (vertx == null) {
// Already logged.
ExecUtils.exitBecauseOfVertxInitializationIssue();
}

if (vertx instanceof VertxInternal) {
((VertxInternal) vertx).addCloseHook(completionHandler -> {
try {
beforeStoppingVertx(vertx);
completionHandler.handle(Future.succeededFuture());
} catch (Exception e) {
completionHandler.handle(Future.failedFuture(e));
}
});
}

deploymentOptions = new DeploymentOptions();
configureFromSystemProperties(deploymentOptions, DEPLOYMENT_OPTIONS_PROP_PREFIX);
deploymentOptions.setConfig(conf).setWorker(worker).setHa(ha).setInstances(instances);
Expand Down Expand Up @@ -442,4 +454,18 @@ protected void afterConfigParsed(JsonObject config) {
((VertxLifecycleHooks) main).afterConfigParsed(config);
}
}

protected void beforeStoppingVertx(Vertx vertx) {
final Object main = executionContext.main();
if (main instanceof VertxLifecycleHooks) {
((VertxLifecycleHooks) main).beforeStoppingVertx(vertx);
}
}

protected void afterStoppingVertx() {
final Object main = executionContext.main();
if (main instanceof VertxLifecycleHooks) {
((VertxLifecycleHooks) main).afterStoppingVertx();
}
}
}
Expand Up @@ -32,7 +32,7 @@

public class DefaultCommandTest {

HelloCommand command = new HelloCommand();
private HelloCommand command = new HelloCommand();

private CommandLine parse(CLI cli, String... args) throws CLIException {
return cli.parse(Arrays.asList(args));
Expand Down Expand Up @@ -94,4 +94,4 @@ public void run() throws CLIException {
}
}

}
}
Expand Up @@ -21,17 +21,21 @@
import io.vertx.core.VertxOptions;
import io.vertx.core.cli.CLIException;
import io.vertx.core.cli.annotations.Name;
import io.vertx.core.impl.launcher.commands.BareCommand;
import io.vertx.core.impl.launcher.commands.CommandTestBase;
import io.vertx.core.impl.launcher.commands.HttpTestVerticle;
import io.vertx.core.impl.launcher.commands.RunCommandTest;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.spi.launcher.CommandFactory;
import io.vertx.core.spi.launcher.DefaultCommand;
import io.vertx.core.spi.launcher.DefaultCommandFactory;
import org.junit.After;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -111,7 +115,11 @@ protected void load() {
}

@Test
public void testThatCustomLauncherCanCustomizeTheClusteredOption() {
public void testThatCustomLauncherCanCustomizeTheClusteredOption() throws InterruptedException {

AtomicBoolean asv = new AtomicBoolean();
AtomicBoolean bsv = new AtomicBoolean();

Launcher myLauncher = new Launcher() {
@Override
protected String getMainVerticle() {
Expand All @@ -127,6 +135,16 @@ public void afterStartingVertx(Vertx vertx) {
public void beforeStartingVertx(VertxOptions options) {
options.setClustered(true);
}

@Override
public void afterStoppingVertx() {
asv.set(true);
}

@Override
public void beforeStoppingVertx(Vertx vertx) {
bsv.set(vertx != null);
}
};

myLauncher.dispatch(new String[0]);
Expand All @@ -139,6 +157,13 @@ public void beforeStartingVertx(VertxOptions options) {
});

assertThat(this.vertx.isClustered()).isTrue();

BareCommand.getTerminationRunnable(vertx, LoggerFactory.getLogger("foo"), () -> {
asv.set(true);
}).run();

assertThat(asv.get()).isTrue();
vertx = null;
}

@Test
Expand Down

0 comments on commit ae18df3

Please sign in to comment.