Skip to content

Commit

Permalink
ClusteredEventBus racy initialization - fixes #2438 - fixes #2439
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed May 11, 2018
1 parent 76be009 commit a0013c0
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 89 deletions.
Expand Up @@ -72,7 +72,6 @@ public class ClusteredEventBus extends EventBusImpl {
private static final String SUBS_MAP_NAME = "__vertx.subs";

private final ClusterManager clusterManager;
private final HAManager haManager;
private final ConcurrentMap<ServerID, ConnectionHolder> connections = new ConcurrentHashMap<>();
private final Context sendNoContext;

Expand All @@ -85,14 +84,11 @@ public class ClusteredEventBus extends EventBusImpl {

public ClusteredEventBus(VertxInternal vertx,
VertxOptions options,
ClusterManager clusterManager,
HAManager haManager) {
ClusterManager clusterManager) {
super(vertx);
this.options = options.getEventBusOptions();
this.clusterManager = clusterManager;
this.haManager = haManager;
this.sendNoContext = vertx.getOrCreateContext();
setClusterViewChangedHandler(haManager);
}

private NetServerOptions getServerOptions() {
Expand Down Expand Up @@ -132,6 +128,9 @@ static void setTrustOptions(TCPSSLOptions sslOptions, TrustOptions options) {

@Override
public void start(Handler<AsyncResult<Void>> resultHandler) {
// Get the HA manager, it has been constructed but it's not yet initialized
HAManager haManager = vertx.haManager();
setClusterViewChangedHandler(haManager);
clusterManager.<String, ClusterNodeInfo>getAsyncMultiMap(SUBS_MAP_NAME, ar2 -> {
if (ar2.succeeded()) {
subs = ar2.result();
Expand Down
12 changes: 8 additions & 4 deletions src/main/java/io/vertx/core/impl/HAManager.java
Expand Up @@ -133,11 +133,16 @@ public HAManager(VertxInternal vertx, DeploymentManager deploymentManager,
this.quorumSize = enabled ? quorumSize : 0;
this.group = enabled ? group : "__DISABLED__";
this.enabled = enabled;
this.haInfo = new JsonObject();
haInfo.put("verticles", new JsonArray());
haInfo.put("group", this.group);
this.haInfo = new JsonObject().put("verticles", new JsonArray()).put("group", this.group);
this.clusterMap = clusterManager.getSyncMap(CLUSTER_MAP_NAME);
this.nodeID = clusterManager.getNodeID();
}

/**
* Initialize the ha manager, i.e register the node listener to propagates the node events and
* start the quorum timer. The quorum will be checked as well.
*/
void init() {
synchronized (haInfo) {
clusterMap.put(nodeID, haInfo.encode());
}
Expand All @@ -146,7 +151,6 @@ public HAManager(VertxInternal vertx, DeploymentManager deploymentManager,
public void nodeAdded(String nodeID) {
HAManager.this.nodeAdded(nodeID);
}

@Override
public void nodeLeft(String leftNodeID) {
HAManager.this.nodeLeft(leftNodeID);
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/vertx/core/impl/VertxFactoryImpl.java
Expand Up @@ -27,22 +27,22 @@ public class VertxFactoryImpl implements VertxFactory {

@Override
public Vertx vertx() {
return new VertxImpl();
return vertx(new VertxOptions());
}

@Override
public Vertx vertx(VertxOptions options) {
if (options.isClustered()) {
throw new IllegalArgumentException("Please use Vertx.clusteredVertx() to create a clustered Vert.x instance");
}
return new VertxImpl(options);
return VertxImpl.vertx(options);
}

@Override
public void clusteredVertx(VertxOptions options, final Handler<AsyncResult<Vertx>> resultHandler) {
// We don't require the user to set clustered to true if they use this method
options.setClustered(true);
new VertxImpl(options, resultHandler);
VertxImpl.clusteredVertx(options, resultHandler);
}

@Override
Expand Down
135 changes: 58 additions & 77 deletions src/main/java/io/vertx/core/impl/VertxImpl.java
Expand Up @@ -107,6 +107,17 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
System.setProperty("io.netty.noJdkZlibDecoder", "false");
}

static VertxImpl vertx(VertxOptions options) {
VertxImpl vertx = new VertxImpl(options);
vertx.init();
return vertx;
}

static void clusteredVertx(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler) {
VertxImpl vertx = new VertxImpl(options);
vertx.initClustered(options, resultHandler);
}

private final FileSystem fileSystem = getFileSystem();
private final SharedData sharedData;
private final VertxMetrics metrics;
Expand All @@ -123,11 +134,10 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
private final EventLoopGroup eventLoopGroup;
private final EventLoopGroup acceptorEventLoopGroup;
private final BlockedThreadChecker checker;
private final boolean haEnabled;
private final AddressResolver addressResolver;
private final AddressResolverOptions addressResolverOptions;
private EventBus eventBus;
private HAManager haManager;
private final EventBus eventBus;
private volatile HAManager haManager;
private boolean closed;
private volatile Handler<Throwable> exceptionHandler;
private final Map<String, SharedWorkerPool> namedWorkerPools;
Expand All @@ -136,15 +146,7 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
private final CloseHooks closeHooks;
private final Transport transport;

VertxImpl() {
this(new VertxOptions());
}

VertxImpl(VertxOptions options) {
this(options, null);
}

VertxImpl(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler) {
private VertxImpl(VertxOptions options) {
// Sanity check
if (Vertx.currentContext() != null) {
log.warn("You're already on a Vert.x context, are you sure you want to create a new Vertx instance?");
Expand Down Expand Up @@ -186,42 +188,42 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
this.addressResolverOptions = options.getAddressResolverOptions();
this.addressResolver = new AddressResolver(this, options.getAddressResolverOptions());
this.deploymentManager = new DeploymentManager(this);
this.haEnabled = options.isClustered() && options.isHAEnabled();
if (options.isClustered()) {
this.clusterManager = getClusterManager(options);
this.clusterManager.setVertx(this);
this.clusterManager.join(ar -> {
if (ar.failed()) {
log.error("Failed to join cluster", ar.cause());
resultHandler.handle(Future.failedFuture(ar.cause()));
} else {
// Provide a memory barrier as we are setting from a different thread
synchronized (VertxImpl.this) {
haManager = new HAManager(this, deploymentManager, clusterManager, options.getQuorumSize(),
options.getHAGroup(), haEnabled);
createAndStartEventBus(options, resultHandler);
}
}
});
this.eventBus = new ClusteredEventBus(this, options, clusterManager);
} else {
this.clusterManager = null;
createAndStartEventBus(options, resultHandler);
this.eventBus = new EventBusImpl(this);
}
this.sharedData = new SharedDataImpl(this, clusterManager);
}

private void createAndStartEventBus(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler) {
if (options.isClustered()) {
eventBus = new ClusteredEventBus(this, options, clusterManager, haManager);
} else {
eventBus = new EventBusImpl(this);
}
eventBus.start(ar -> {
if (ar.succeeded()) {
if (resultHandler != null) resultHandler.handle(Future.succeededFuture(this));
private void init() {
eventBus.start(ar -> {});
}

private void initClustered(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler) {
clusterManager.setVertx(this);
clusterManager.join(ar1 -> {
if (ar1.failed()) {
log.error("Failed to join cluster", ar1.cause());
resultHandler.handle(Future.failedFuture(ar1.cause()));
} else {
log.error("Failed to start event bus", ar.cause());
if (resultHandler != null) resultHandler.handle(Future.failedFuture(ar.cause()));
haManager = new HAManager(this, deploymentManager, clusterManager, options.getQuorumSize(), options.getHAGroup(), options.isHAEnabled());
eventBus.start(ar2 -> {
AsyncResult<Vertx> res;
if (ar2.succeeded()) {
// Init the manager (i.e register listener and check the quorum)
// after the event bus has been fully started and updated its state
// it will have also set the clustered changed view handler on the ha manager
haManager.init();
res = Future.succeededFuture(this);
} else {
log.error("Failed to start event bus", ar2.cause());
res = Future.failedFuture(ar2.cause());
}
resultHandler.handle(res);
});
}
});
}
Expand Down Expand Up @@ -298,13 +300,6 @@ public HttpClient createHttpClient() {
}

public EventBus eventBus() {
if (eventBus == null) {
// If reading from different thread possibility that it's been set but not visible - so provide
// memory barrier
synchronized (this) {
return eventBus;
}
}
return eventBus;
}

Expand Down Expand Up @@ -441,31 +436,25 @@ private VertxMetrics initialiseMetrics(VertxOptions options) {
}

private ClusterManager getClusterManager(VertxOptions options) {
if (options.isClustered()) {
if (options.getClusterManager() != null) {
return options.getClusterManager();
ClusterManager mgr = options.getClusterManager();
if (mgr == null) {
String clusterManagerClassName = System.getProperty("vertx.cluster.managerClass");
if (clusterManagerClassName != null) {
// We allow specify a sys prop for the cluster manager factory which overrides ServiceLoader
try {
Class<?> clazz = Class.forName(clusterManagerClassName);
mgr = (ClusterManager) clazz.newInstance();
} catch (Exception e) {
throw new IllegalStateException("Failed to instantiate " + clusterManagerClassName, e);
}
} else {
ClusterManager mgr;
String clusterManagerClassName = System.getProperty("vertx.cluster.managerClass");
if (clusterManagerClassName != null) {
// We allow specify a sys prop for the cluster manager factory which overrides ServiceLoader
try {
Class<?> clazz = Class.forName(clusterManagerClassName);
mgr = (ClusterManager) clazz.newInstance();
} catch (Exception e) {
throw new IllegalStateException("Failed to instantiate " + clusterManagerClassName, e);
}
} else {
mgr = ServiceHelper.loadFactoryOrNull(ClusterManager.class);
if (mgr == null) {
throw new IllegalStateException("No ClusterManagerFactory instances found on classpath");
}
mgr = ServiceHelper.loadFactoryOrNull(ClusterManager.class);
if (mgr == null) {
throw new IllegalStateException("No ClusterManagerFactory instances found on classpath");
}
return mgr;
}
} else {
return null;
}
return mgr;
}

private long scheduleTimeout(ContextImpl context, Handler<Long> handler, long delay, boolean periodic) {
Expand Down Expand Up @@ -832,16 +821,8 @@ public void operationComplete(io.netty.util.concurrent.Future future) throws Exc
});
}

private HAManager haManager() {
// If reading from different thread possibility that it's been set but not visible - so provide
// memory barrier
if (haManager == null && haEnabled) {
synchronized (this) {
return haManager;
}
} else {
return haManager;
}
public HAManager haManager() {
return haManager;
}

private class InternalTimerHandler implements Handler<Void>, Closeable {
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/vertx/core/impl/VertxInternal.java
Expand Up @@ -102,6 +102,8 @@ public interface VertxInternal extends Vertx {

ClusterManager getClusterManager();

HAManager haManager();

/**
* Resolve an address (e.g. {@code vertx.io} into the first found A (IPv4) or AAAA (IPv6) record.
*
Expand Down

0 comments on commit a0013c0

Please sign in to comment.